dataflow.py 201 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252
  1. # sys.path.append("/data")
  2. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
  3. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  4. from BaseDataMaintenance.common.multiProcess import MultiHandler
  5. from queue import Queue
  6. from multiprocessing import Queue as PQueue
  7. from BaseDataMaintenance.model.ots.document_tmp import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.model.ots.document_html import *
  10. from BaseDataMaintenance.model.ots.document_extract2 import *
  11. from BaseDataMaintenance.model.ots.project import *
  12. from BaseDataMaintenance.model.ots.project2_tmp import *
  13. from BaseDataMaintenance.model.ots.document import *
  14. from BaseDataMaintenance.model.ots.project_process import *
  15. import base64
  16. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  17. from uuid import uuid4
  18. from BaseDataMaintenance.common.ossUtils import *
  19. from BaseDataMaintenance.dataSource.source import is_internal,getAuth
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. from BaseDataMaintenance.maintenance.dataflow_settings import *
  22. from threading import Thread
  23. import oss2
  24. from BaseDataMaintenance.maxcompute.documentDumplicate import *
  25. from BaseDataMaintenance.maxcompute.documentMerge import *
  26. from BaseDataMaintenance.common.otsUtils import *
  27. from BaseDataMaintenance.common.activateMQUtils import *
  28. from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
  29. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  30. def getSet(list_dict,key):
  31. _set = set()
  32. for item in list_dict:
  33. if key in item:
  34. if item[key]!='' and item[key] is not None:
  35. if re.search("^\d[\d\.]*$",item[key]) is not None:
  36. _set.add(str(float(item[key])))
  37. else:
  38. _set.add(str(item[key]))
  39. return _set
  40. def getSimilarityOfString(str1,str2):
  41. _set1 = set()
  42. _set2 = set()
  43. if str1 is not None:
  44. for i in range(1,len(str1)):
  45. _set1.add(str1[i-1:i+1])
  46. if str2 is not None:
  47. for i in range(1,len(str2)):
  48. _set2.add(str2[i-1:i+1])
  49. _len = max(1,min(len(_set1),len(_set2)))
  50. return len(_set1&_set2)/_len
  51. def getDiffIndex(list_dict,key,confidence=100):
  52. _set = set()
  53. for _i in range(len(list_dict)):
  54. item = list_dict[_i]
  55. if item["confidence"]>=confidence:
  56. continue
  57. if key in item:
  58. if item[key]!='' and item[key] is not None:
  59. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  60. _set.add(str(float(item[key])))
  61. else:
  62. _set.add(str(item[key]))
  63. if len(_set)>1:
  64. return _i
  65. return len(list_dict)
  66. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  67. swf_urls = []
  68. try:
  69. list_files = os.listdir(swf_dir)
  70. list_files.sort(key=lambda x:x)
  71. headers = dict()
  72. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  73. for _file in list_files:
  74. swf_localpath = "%s/%s"%(swf_dir,_file)
  75. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  76. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  77. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  78. swf_urls.append(_url)
  79. os.remove(swf_localpath)
  80. except Exception as e:
  81. traceback.print_exc()
  82. return swf_urls
  83. class Dataflow():
  84. def __init__(self):
  85. self.ots_client = getConnect_ots()
  86. self.queue_init = Queue()
  87. self.queue_attachment = Queue()
  88. self.queue_attachment_ocr = Queue()
  89. self.queue_attachment_not_ocr = Queue()
  90. self.list_attachment_ocr = []
  91. self.list_attachment_not_ocr = []
  92. self.queue_extract = Queue()
  93. self.list_extract = []
  94. self.queue_dumplicate = PQueue()
  95. self.dumplicate_set = set()
  96. self.queue_merge = Queue()
  97. self.queue_syncho = Queue()
  98. self.queue_remove = Queue()
  99. self.queue_remove_project = Queue()
  100. self.attachment_rec_interface = ""
  101. self.ots_client_merge = getConnect_ots()
  102. if is_internal:
  103. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  104. else:
  105. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  106. if is_internal:
  107. self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  108. self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  109. self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  110. else:
  111. self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
  112. self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
  113. self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
  114. self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
  115. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  116. self.auth = getAuth()
  117. oss2.defaults.connection_pool_size = 100
  118. oss2.defaults.multiget_num_threads = 20
  119. log("bucket_url:%s"%(self.bucket_url))
  120. self.attachment_bucket_name = "attachment-hub"
  121. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  122. self.current_path = os.path.dirname(__file__)
  123. def flow_init(self):
  124. def producer():
  125. bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
  126. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  127. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  128. ColumnsToGet(return_type=ColumnReturnType.ALL))
  129. log("flow_init producer total_count:%d"%total_count)
  130. list_dict = getRow_ots(rows)
  131. for _dict in list_dict:
  132. self.queue_init.put(_dict)
  133. _count = len(list_dict)
  134. while next_token:
  135. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  136. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  137. ColumnsToGet(return_type=ColumnReturnType.ALL))
  138. list_dict = getRow_ots(rows)
  139. for _dict in list_dict:
  140. self.queue_init.put(_dict)
  141. _count += len(list_dict)
  142. def comsumer():
  143. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  144. mt.run()
  145. def comsumer_handle(item,result_queue,ots_client):
  146. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  147. if document_tmp_dochtmlcon in item:
  148. item.pop(document_tmp_dochtmlcon)
  149. if document_tmp_doctextcon in item:
  150. item.pop(document_tmp_doctextcon)
  151. if document_tmp_attachmenttextcon in item:
  152. item.pop(document_tmp_attachmenttextcon)
  153. _status = item.get(document_tmp_status)
  154. new_status = None
  155. if _status>=201 and _status<=300:
  156. item[document_tmp_save] = 1
  157. new_status = 81
  158. elif _status>=401 and _status<=450:
  159. item[document_tmp_save] = 0
  160. new_status = 81
  161. else:
  162. new_status = 1
  163. # new_status = 1
  164. item[document_tmp_status] = new_status
  165. dtmp = Document_tmp(item)
  166. dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
  167. document_tmp_docid:item.get(document_tmp_docid),
  168. document_tmp_dochtmlcon:_dochtmlcon})
  169. dtmp.update_row(ots_client)
  170. dhtml.update_row(ots_client)
  171. producer()
  172. comsumer()
  173. def getTitleFromHtml(self,filemd5,_html):
  174. _soup = BeautifulSoup(_html,"lxml")
  175. _find = _soup.find("a",attrs={"data":filemd5})
  176. _title = ""
  177. if _find is not None:
  178. _title = _find.get_text()
  179. return _title
  180. def getSourceLinkFromHtml(self,filemd5,_html):
  181. _soup = BeautifulSoup(_html,"lxml")
  182. _find = _soup.find("a",attrs={"filelink":filemd5})
  183. filelink = ""
  184. if _find is None:
  185. _find = _soup.find("img",attrs={"filelink":filemd5})
  186. if _find is not None:
  187. filelink = _find.attrs.get("src","")
  188. else:
  189. filelink = _find.attrs.get("href","")
  190. return filelink
  191. def request_attachment_interface(self,attach,_dochtmlcon):
  192. filemd5 = attach.getProperties().get(attachment_filemd5)
  193. _status = attach.getProperties().get(attachment_status)
  194. _filetype = attach.getProperties().get(attachment_filetype)
  195. _size = attach.getProperties().get(attachment_size)
  196. _path = attach.getProperties().get(attachment_path)
  197. _uuid = uuid4()
  198. objectPath = attach.getProperties().get(attachment_path)
  199. localpath = os.path.join(self.current_path,"download",_uuid.hex)
  200. docids = attach.getProperties().get(attachment_docids)
  201. try:
  202. if _size>ATTACHMENT_LARGESIZE:
  203. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
  204. log("attachment :%s of path:%s to large"%(filemd5,_path))
  205. attach.update_row(self.ots_client)
  206. return True
  207. else:
  208. d_start_time = time.time()
  209. if downloadFile(self.bucket,objectPath,localpath):
  210. time_download = time.time()-d_start_time
  211. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  212. #调用接口处理结果
  213. start_time = time.time()
  214. _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype,kwargs={"timeout":600})
  215. if _success:
  216. log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  217. else:
  218. log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  219. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  220. _html = ""
  221. return False
  222. swf_images = eval(swf_images)
  223. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  224. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  225. if len(swf_urls)==0:
  226. objectPath = attach.getProperties().get(attachment_path,"")
  227. localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
  228. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  229. if not os.path.exists(swf_dir):
  230. os.mkdir(swf_dir)
  231. for _i in range(len(swf_images)):
  232. _base = swf_images[_i]
  233. _base = base64.b64decode(_base)
  234. filename = "swf_page_%d.png"%(_i)
  235. filepath = os.path.join(swf_dir,filename)
  236. with open(filepath,"wb") as f:
  237. f.write(_base)
  238. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  239. if os.path.exists(swf_dir):
  240. os.rmdir(swf_dir)
  241. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  242. if re.search("<td",_html) is not None:
  243. attach.setValue(attachment_has_table,1,True)
  244. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  245. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  246. if _file_title!="":
  247. attach.setValue(attachment_file_title,_file_title,True)
  248. if filelink!="":
  249. attach.setValue(attachment_file_link,filelink,True)
  250. attach.setValue(attachment_attachmenthtml,_html,True)
  251. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  252. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  253. attach.setValue(attachment_recsize,len(_html),True)
  254. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  255. attach.update_row(self.ots_client) #线上再开放更新
  256. return True
  257. else:
  258. return False
  259. except oss2.exceptions.NotFound as e:
  260. return True
  261. except Exception as e:
  262. traceback.print_exc()
  263. finally:
  264. try:
  265. os.remove(localpath)
  266. except:
  267. pass
  268. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  269. list_html = []
  270. swf_urls = []
  271. for _attach in list_attach:
  272. #测试全跑
  273. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  274. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  275. if _html is None:
  276. _html = ""
  277. list_html.append(_html)
  278. else:
  279. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  280. if not _succeed:
  281. return False,"",[]
  282. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  283. if _html is None:
  284. _html = ""
  285. list_html.append(_html)
  286. if _attach.getProperties().get(attachment_filetype)=="swf":
  287. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  288. return True,list_html,swf_urls
  289. def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
  290. set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
  291. set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
  292. list_must_queries = []
  293. list_must_no_queries = []
  294. for k,v in _dict.items():
  295. if k in set_match:
  296. if isinstance(v,str):
  297. l_s = []
  298. for s_v in v.split(","):
  299. l_s.append(MatchQuery(k,s_v))
  300. list_must_queries.append(BoolQuery(should_queries=l_s))
  301. elif k in set_nested:
  302. _v = v
  303. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  304. _v = float(_v)
  305. list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  306. elif k in set_term:
  307. list_must_queries.append(TermQuery(k,v))
  308. elif k in set_phrase:
  309. list_must_queries.append(MatchPhraseQuery(k,v))
  310. elif k in set_range:
  311. if len(v)==1:
  312. list_must_queries.append(RangeQuery(k,v[0]))
  313. elif len(v)==2:
  314. list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
  315. for k,v in _dict_must_not.items():
  316. if k in set_match:
  317. if isinstance(v,str):
  318. l_s = []
  319. for s_v in v.split(","):
  320. l_s.append(MatchQuery(k,s_v))
  321. list_must_no_queries.append(BoolQuery(should_queries=l_s))
  322. elif k in set_nested:
  323. _v = v
  324. if k!="" and k=="bidding_budget" or k=="win_bid_price":
  325. _v = float(_v)
  326. list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
  327. elif k in set_term:
  328. list_must_no_queries.append(TermQuery(k,v))
  329. elif k in set_range:
  330. if len(v)==1:
  331. list_must_no_queries.append(RangeQuery(k,v[0]))
  332. elif len(v)==2:
  333. list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
  334. return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
  335. def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
  336. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  337. extract_count = 0
  338. if project_code is not None and project_code!="":
  339. extract_count += 1
  340. if project_name is not None and project_name!="":
  341. extract_count += 1
  342. if tenderee is not None and tenderee!="":
  343. extract_count += 1
  344. if agency is not None and agency!="":
  345. extract_count += 1
  346. if sub_docs_json is not None:
  347. sub_docs = json.loads(sub_docs_json)
  348. sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
  349. sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
  350. # log("==%s"%(str(sub_docs)))
  351. for sub_docs in sub_docs:
  352. for _key_sub_docs in sub_docs.keys():
  353. extract_count += 1
  354. if _key_sub_docs in columns:
  355. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  356. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  357. if float(sub_docs[_key_sub_docs])>0:
  358. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  359. else:
  360. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  361. return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
  362. def post_extract(self,_dict):
  363. win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  364. _dict["win_tenderer"] = win_tenderer
  365. _dict["bidding_budget"] = bidding_budget
  366. _dict["win_bid_price"] = win_bid_price
  367. if "extract_count" not in _dict:
  368. _dict["extract_count"] = extract_count
  369. def get_dump_columns(self,_dict):
  370. docchannel = _dict.get(document_tmp_docchannel,0)
  371. project_code = _dict.get(document_tmp_project_code,"")
  372. project_name = _dict.get(document_tmp_project_name,"")
  373. tenderee = _dict.get(document_tmp_tenderee,"")
  374. agency = _dict.get(document_tmp_agency,"")
  375. doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
  376. win_tenderer = _dict.get("win_tenderer","")
  377. bidding_budget = _dict.get("bidding_budget","")
  378. if bidding_budget==0:
  379. bidding_budget = ""
  380. win_bid_price = _dict.get("win_bid_price","")
  381. if win_bid_price==0:
  382. win_bid_price = ""
  383. page_time = _dict.get(document_tmp_page_time,"")
  384. fingerprint = _dict.get(document_tmp_fingerprint,"")
  385. product = _dict.get(document_tmp_product,"")
  386. return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
  387. def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
  388. flag = True
  389. for _key in singleNum_keys:
  390. if len(getSet(_split,_key))>1:
  391. flag = False
  392. break
  393. for _key in multiNum_keys:
  394. if len(getSet(_split,_key))<=1:
  395. flag = False
  396. break
  397. project_code = item.get("project_code","")
  398. for _key in notlike_keys:
  399. if not flag:
  400. break
  401. for _d in _split:
  402. _key_v = _d.get(_key,"")
  403. _sim = getSimilarityOfString(project_code,_key_v)
  404. if _sim>0.7 and _sim<1:
  405. flag = False
  406. break
  407. #判断组内每条公告是否包含
  408. if flag:
  409. if len(contain_keys)>0:
  410. for _key in contain_keys:
  411. MAX_CONTAIN_COLUMN = None
  412. for _d in _split:
  413. contain_column = _d.get(_key)
  414. if contain_column is not None and contain_column !="":
  415. if MAX_CONTAIN_COLUMN is None:
  416. MAX_CONTAIN_COLUMN = contain_column
  417. else:
  418. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  419. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  420. flag = False
  421. break
  422. MAX_CONTAIN_COLUMN = contain_column
  423. else:
  424. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  425. flag = False
  426. break
  427. if flag:
  428. return _split
  429. return []
  430. def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count,document_tmp_doctitle]):
  431. list_data = []
  432. if isinstance(_query,list):
  433. bool_query = BoolQuery(should_queries=_query)
  434. else:
  435. bool_query = _query
  436. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  437. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
  438. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  439. list_dict = getRow_ots(rows)
  440. for _dict in list_dict:
  441. self.post_extract(_dict)
  442. _dict["confidence"] = confidence
  443. list_data.append(_dict)
  444. # _count = len(list_dict)
  445. # while next_token:
  446. # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  447. # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  448. # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  449. # list_dict = getRow_ots(rows)
  450. # for _dict in list_dict:
  451. # self.post_extract(_dict)
  452. # _dict["confidence"] = confidence
  453. # list_data.append(_dict)
  454. list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
  455. return list_dict
  456. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  457. list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  458. for _dict in list_dict:
  459. self.post_extract(_dict)
  460. _docid = _dict.get(document_tmp_docid)
  461. if _docid not in set_docid:
  462. base_list.append(_dict)
  463. set_docid.add(_docid)
  464. def translate_dumplicate_rules(self,status_from,item):
  465. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  466. if page_time=='':
  467. page_time = getCurrent_date("%Y-%m-%d")
  468. base_dict = {
  469. "status":[status_from[0]],
  470. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  471. }
  472. must_not_dict = {"save":0}
  473. list_rules = []
  474. singleNum_keys = ["tenderee","win_tenderer"]
  475. if fingerprint!="":
  476. _dict = {}
  477. confidence = 100
  478. _dict[document_tmp_fingerprint] = fingerprint
  479. _dict.update(base_dict)
  480. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  481. _rule = {"confidence":confidence,
  482. "item":item,
  483. "query":_query,
  484. "singleNum_keys":[],
  485. "contain_keys":[],
  486. "multiNum_keys":[]}
  487. list_rules.append(_rule)
  488. if docchannel in (52,118):
  489. if bidding_budget!="" and tenderee!="" and project_code!="":
  490. confidence = 90
  491. _dict = {document_tmp_docchannel:docchannel,
  492. "bidding_budget":item.get("bidding_budget"),
  493. document_tmp_tenderee:item.get(document_tmp_tenderee,""),
  494. document_tmp_project_code:item.get(document_tmp_project_code,"")
  495. }
  496. _dict.update(base_dict)
  497. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  498. _rule = {"confidence":confidence,
  499. "query":_query,
  500. "singleNum_keys":singleNum_keys,
  501. "contain_keys":[],
  502. "multiNum_keys":[document_tmp_web_source_no]}
  503. list_rules.append(_rule)
  504. if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  505. confidence = 80
  506. _dict = {document_tmp_docchannel:docchannel,
  507. "doctitle_refine":doctitle_refine,
  508. "tenderee":tenderee,
  509. bidding_budget:"bidding_budget"
  510. }
  511. _dict.update(base_dict)
  512. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  513. _rule = {"confidence":confidence,
  514. "query":_query,
  515. "singleNum_keys":singleNum_keys,
  516. "contain_keys":[],
  517. "multiNum_keys":[document_tmp_web_source_no]}
  518. list_rules.append(_rule)
  519. if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
  520. confidence = 90
  521. _dict = {document_tmp_docchannel:docchannel,
  522. "project_code":project_code,
  523. "doctitle_refine":doctitle_refine,
  524. "agency":agency,
  525. "bidding_budget":bidding_budget
  526. }
  527. _dict.update(base_dict)
  528. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  529. _rule = {"confidence":confidence,
  530. "query":_query,
  531. "singleNum_keys":singleNum_keys,
  532. "contain_keys":[],
  533. "multiNum_keys":[document_tmp_web_source_no]}
  534. list_rules.append(_rule)
  535. if project_code!="" and tenderee!="" and bidding_budget!="":
  536. confidence = 91
  537. _dict = {document_tmp_docchannel:docchannel,
  538. "project_code":project_code,
  539. "tenderee":tenderee,
  540. "bidding_budget":bidding_budget
  541. }
  542. _dict.update(base_dict)
  543. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  544. _rule = {"confidence":confidence,
  545. "query":_query,
  546. "singleNum_keys":singleNum_keys,
  547. "contain_keys":[],
  548. "multiNum_keys":[document_tmp_web_source_no]}
  549. list_rules.append(_rule)
  550. if doctitle_refine!="" and agency!="" and bidding_budget!="":
  551. confidence = 71
  552. _dict = {document_tmp_docchannel:docchannel,
  553. "doctitle_refine":doctitle_refine,
  554. "agency":agency,
  555. "bidding_budget":bidding_budget
  556. }
  557. _dict.update(base_dict)
  558. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  559. _rule = {"confidence":confidence,
  560. "query":_query,
  561. "singleNum_keys":singleNum_keys,
  562. "contain_keys":[],
  563. "multiNum_keys":[document_tmp_web_source_no]}
  564. list_rules.append(_rule)
  565. if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
  566. confidence = 91
  567. _dict = {document_tmp_docchannel:docchannel,
  568. "project_code":project_code,
  569. "project_name":project_name,
  570. "agency":agency,
  571. "bidding_budget":bidding_budget
  572. }
  573. _dict.update(base_dict)
  574. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  575. n_singleKeys = [i for i in singleNum_keys]
  576. n_singleKeys.append(document_tmp_web_source_no)
  577. _rule = {"confidence":confidence,
  578. "query":_query,
  579. "singleNum_keys":n_singleKeys,
  580. "contain_keys":[],
  581. "multiNum_keys":[]}
  582. list_rules.append(_rule)
  583. ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
  584. if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
  585. confidence = 91
  586. _dict = {document_tmp_docchannel:docchannel,
  587. "project_code":project_code,
  588. "project_name":project_name,
  589. "tenderee":tenderee,
  590. "bidding_budget":bidding_budget
  591. }
  592. _dict.update(base_dict)
  593. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  594. n_singleKeys = [i for i in singleNum_keys]
  595. n_singleKeys.append(document_tmp_web_source_no)
  596. _rule = {"confidence":confidence,
  597. "query":_query,
  598. "singleNum_keys":n_singleKeys,
  599. "contain_keys":[],
  600. "multiNum_keys":[]}
  601. list_rules.append(_rule)
  602. if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
  603. confidence = 71
  604. _dict = {document_tmp_docchannel:docchannel,
  605. "project_code":project_code,
  606. "doctitle_refine":doctitle_refine,
  607. "tenderee":tenderee,
  608. "bidding_budget":bidding_budget
  609. }
  610. _dict.update(base_dict)
  611. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  612. _rule = {"confidence":confidence,
  613. "query":_query,
  614. "singleNum_keys":singleNum_keys,
  615. "contain_keys":[],
  616. "multiNum_keys":[document_tmp_web_source_no]}
  617. list_rules.append(_rule)
  618. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  619. if project_name!="" and agency!="":
  620. tmp_bidding = 0
  621. if bidding_budget!="":
  622. tmp_bidding = bidding_budget
  623. confidence = 51
  624. _dict = {document_tmp_docchannel:docchannel,
  625. "project_name":project_name,
  626. "agency":agency,
  627. "bidding_budget":tmp_bidding
  628. }
  629. _dict.update(base_dict)
  630. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  631. _rule = {"confidence":confidence,
  632. "query":_query,
  633. "singleNum_keys":singleNum_keys,
  634. "contain_keys":[],
  635. "multiNum_keys":[document_tmp_web_source_no]}
  636. list_rules.append(_rule)
  637. #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
  638. if project_code!="" and agency!="":
  639. tmp_bidding = 0
  640. if bidding_budget!="":
  641. tmp_bidding = bidding_budget
  642. confidence = 51
  643. _dict = {document_tmp_docchannel:docchannel,
  644. "project_code":project_code,
  645. "agency":agency,
  646. "bidding_budget":tmp_bidding
  647. }
  648. _dict.update(base_dict)
  649. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  650. _rule = {"confidence":confidence,
  651. "query":_query,
  652. "singleNum_keys":singleNum_keys,
  653. "contain_keys":[],
  654. "multiNum_keys":[document_tmp_web_source_no]}
  655. list_rules.append(_rule)
  656. if docchannel not in (101,119,120):
  657. #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
  658. if project_name!="" and tenderee!="" and project_code!="":
  659. tmp_bidding = 0
  660. if bidding_budget!="":
  661. tmp_bidding = bidding_budget
  662. confidence = 51
  663. _dict = {document_tmp_docchannel:docchannel,
  664. "project_name":project_name,
  665. "tenderee":tenderee,
  666. "project_code":project_code
  667. }
  668. _dict.update(base_dict)
  669. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  670. _rule = {"confidence":confidence,
  671. "query":_query,
  672. "singleNum_keys":singleNum_keys,
  673. "contain_keys":[],
  674. "multiNum_keys":[document_tmp_web_source_no]}
  675. list_rules.append(_rule)
  676. if docchannel in (101,119,120):
  677. #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
  678. if project_code!="" and project_name!="" and win_tenderer!="":
  679. tmp_win = 0
  680. if win_bid_price!="":
  681. tmp_win = win_bid_price
  682. confidence = 61
  683. _dict = {document_tmp_docchannel:docchannel,
  684. "project_code":project_code,
  685. "project_name":project_name,
  686. "win_tenderer":win_tenderer,
  687. "win_bid_price":tmp_win
  688. }
  689. _dict.update(base_dict)
  690. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  691. _rule = {"confidence":confidence,
  692. "query":_query,
  693. "singleNum_keys":singleNum_keys,
  694. "contain_keys":[],
  695. "multiNum_keys":[]}
  696. list_rules.append(_rule)
  697. if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
  698. confidence = 72
  699. _dict = {document_tmp_docchannel:docchannel,
  700. "project_code":project_code,
  701. "project_name":project_name,
  702. "bidding_budget":bidding_budget,
  703. "product":product
  704. }
  705. _dict.update(base_dict)
  706. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  707. n_singleKeys = [i for i in singleNum_keys]
  708. n_singleKeys.append(document_tmp_web_source_no)
  709. _rule = {"confidence":confidence,
  710. "query":_query,
  711. "singleNum_keys":n_singleKeys,
  712. "contain_keys":[],
  713. "multiNum_keys":[]}
  714. list_rules.append(_rule)
  715. if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  716. confidence = 91
  717. _dict = {document_tmp_docchannel:docchannel,
  718. "project_code":project_code,
  719. "doctitle_refine":doctitle_refine,
  720. "win_tenderer":win_tenderer,
  721. "win_bid_price":win_bid_price
  722. }
  723. _dict.update(base_dict)
  724. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  725. n_singleKeys = [i for i in singleNum_keys]
  726. n_singleKeys.append(document_tmp_web_source_no)
  727. _rule = {"confidence":confidence,
  728. "query":_query,
  729. "singleNum_keys":n_singleKeys,
  730. "contain_keys":[],
  731. "multiNum_keys":[]}
  732. list_rules.append(_rule)
  733. ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
  734. if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
  735. confidence = 91
  736. _dict = {document_tmp_docchannel:docchannel,
  737. "project_code":project_code,
  738. "project_name":project_name,
  739. "win_tenderer":win_tenderer,
  740. "win_bid_price":win_bid_price
  741. }
  742. _dict.update(base_dict)
  743. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  744. n_singleKeys = [i for i in singleNum_keys]
  745. n_singleKeys.append(document_tmp_web_source_no)
  746. _rule = {"confidence":confidence,
  747. "query":_query,
  748. "singleNum_keys":n_singleKeys,
  749. "contain_keys":[],
  750. "multiNum_keys":[]}
  751. list_rules.append(_rule)
  752. if project_name!="" and win_tenderer!="" and win_bid_price!="":
  753. confidence = 91
  754. _dict = {document_tmp_docchannel:docchannel,
  755. "project_name":project_name,
  756. "win_tenderer":win_tenderer,
  757. "win_bid_price":win_bid_price,
  758. }
  759. _dict.update(base_dict)
  760. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  761. _rule = {"confidence":confidence,
  762. "query":_query,
  763. "singleNum_keys":singleNum_keys,
  764. "contain_keys":[],
  765. "multiNum_keys":[document_tmp_web_source_no]}
  766. list_rules.append(_rule)
  767. if project_code!="" and win_tenderer!="" and win_bid_price!="":
  768. confidence = 91
  769. _dict = {document_tmp_docchannel:docchannel,
  770. "project_code":project_code,
  771. "win_tenderer":win_tenderer,
  772. "win_bid_price":win_bid_price,
  773. }
  774. _dict.update(base_dict)
  775. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  776. _rule = {"confidence":confidence,
  777. "query":_query,
  778. "singleNum_keys":singleNum_keys,
  779. "contain_keys":[],
  780. "multiNum_keys":[document_tmp_web_source_no]}
  781. list_rules.append(_rule)
  782. if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  783. confidence = 91
  784. _dict = {document_tmp_docchannel:docchannel,
  785. "project_code":project_code,
  786. "doctitle_refine":doctitle_refine,
  787. "win_tenderer":win_tenderer,
  788. "win_bid_price":win_bid_price
  789. }
  790. _dict.update(base_dict)
  791. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  792. n_singleKeys = [i for i in singleNum_keys]
  793. n_singleKeys.append(document_tmp_web_source_no)
  794. _rule = {"confidence":confidence,
  795. "query":_query,
  796. "singleNum_keys":n_singleKeys,
  797. "contain_keys":[],
  798. "multiNum_keys":[]}
  799. list_rules.append(_rule)
  800. if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
  801. confidence=90
  802. _dict = {document_tmp_docchannel:docchannel,
  803. "doctitle_refine":doctitle_refine,
  804. "win_tenderer":win_tenderer,
  805. "win_bid_price":win_bid_price
  806. }
  807. _dict.update(base_dict)
  808. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  809. _rule = {"confidence":confidence,
  810. "query":_query,
  811. "singleNum_keys":singleNum_keys,
  812. "contain_keys":[],
  813. "multiNum_keys":[document_tmp_web_source_no]}
  814. list_rules.append(_rule)
  815. if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
  816. confidence=95
  817. _dict = {document_tmp_docchannel:docchannel,
  818. "project_name":project_name,
  819. "win_tenderer":win_tenderer,
  820. "win_bid_price":win_bid_price,
  821. "project_code":project_code
  822. }
  823. _dict.update(base_dict)
  824. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  825. _rule = {"confidence":confidence,
  826. "query":_query,
  827. "singleNum_keys":singleNum_keys,
  828. "contain_keys":[],
  829. "multiNum_keys":[document_tmp_web_source_no]}
  830. list_rules.append(_rule)
  831. if docchannel in (51,103,115,116):
  832. #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  833. if doctitle_refine!="" and tenderee!="":
  834. tmp_budget = 0
  835. if bidding_budget!="":
  836. tmp_budget = bidding_budget
  837. confidence=81
  838. _dict = {document_tmp_docchannel:docchannel,
  839. "doctitle_refine":doctitle_refine,
  840. "tenderee":tenderee,
  841. "bidding_budget":tmp_budget,
  842. }
  843. _dict.update(base_dict)
  844. _dict["page_time"] = [page_time,page_time]
  845. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  846. _rule = {"confidence":confidence,
  847. "query":_query,
  848. "singleNum_keys":singleNum_keys,
  849. "contain_keys":[],
  850. "multiNum_keys":[document_tmp_web_source_no]}
  851. list_rules.append(_rule)
  852. #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
  853. if project_code!="" and tenderee!="":
  854. confidence=81
  855. tmp_budget = 0
  856. if bidding_budget!="":
  857. tmp_budget = bidding_budget
  858. _dict = {document_tmp_docchannel:docchannel,
  859. "project_code":project_code,
  860. "tenderee":tenderee,
  861. "bidding_budget":tmp_budget,
  862. }
  863. _dict.update(base_dict)
  864. _dict["page_time"] = [page_time,page_time]
  865. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  866. _rule = {"confidence":confidence,
  867. "query":_query,
  868. "singleNum_keys":singleNum_keys,
  869. "contain_keys":[],
  870. "multiNum_keys":[document_tmp_web_source_no]}
  871. list_rules.append(_rule)
  872. if project_name!="" and tenderee!="":
  873. confidence=81
  874. tmp_budget = 0
  875. if bidding_budget!="":
  876. tmp_budget = bidding_budget
  877. _dict = {document_tmp_docchannel:docchannel,
  878. "project_name":project_name,
  879. "tenderee":tenderee,
  880. "bidding_budget":tmp_budget,
  881. }
  882. _dict.update(base_dict)
  883. _dict["page_time"] = [page_time,page_time]
  884. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  885. _rule = {"confidence":confidence,
  886. "query":_query,
  887. "singleNum_keys":singleNum_keys,
  888. "contain_keys":[],
  889. "multiNum_keys":[document_tmp_web_source_no]}
  890. list_rules.append(_rule)
  891. if agency!="" and tenderee!="":
  892. confidence=81
  893. tmp_budget = 0
  894. if bidding_budget!="":
  895. tmp_budget = bidding_budget
  896. _dict = {document_tmp_docchannel:docchannel,
  897. "agency":agency,
  898. "tenderee":tenderee,
  899. "bidding_budget":tmp_budget,
  900. "product":product
  901. }
  902. _dict.update(base_dict)
  903. _dict["page_time"] = [page_time,page_time]
  904. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  905. _rule = {"confidence":confidence,
  906. "query":_query,
  907. "singleNum_keys":singleNum_keys,
  908. "contain_keys":[],
  909. "multiNum_keys":[document_tmp_web_source_no]}
  910. list_rules.append(_rule)
  911. if agency!="" and project_code!="":
  912. confidence=81
  913. tmp_budget = 0
  914. if bidding_budget!="":
  915. tmp_budget = bidding_budget
  916. _dict = {document_tmp_docchannel:docchannel,
  917. "agency":agency,
  918. "project_code":project_code,
  919. "bidding_budget":tmp_budget,
  920. "product":product
  921. }
  922. _dict.update(base_dict)
  923. _dict["page_time"] = [page_time,page_time]
  924. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  925. _rule = {"confidence":confidence,
  926. "query":_query,
  927. "singleNum_keys":singleNum_keys,
  928. "contain_keys":[],
  929. "multiNum_keys":[document_tmp_web_source_no]}
  930. list_rules.append(_rule)
  931. if agency!="" and project_name!="":
  932. confidence=81
  933. tmp_budget = 0
  934. if bidding_budget!="":
  935. tmp_budget = bidding_budget
  936. _dict = {document_tmp_docchannel:docchannel,
  937. "agency":agency,
  938. "project_name":project_name,
  939. "bidding_budget":tmp_budget,
  940. "product":product
  941. }
  942. _dict.update(base_dict)
  943. _dict["page_time"] = [page_time,page_time]
  944. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  945. _rule = {"confidence":confidence,
  946. "query":_query,
  947. "singleNum_keys":singleNum_keys,
  948. "contain_keys":[],
  949. "multiNum_keys":[document_tmp_web_source_no]}
  950. list_rules.append(_rule)
  951. #五选二
  952. if tenderee!="" and bidding_budget!="" and product!="":
  953. confidence=80
  954. _dict = {document_tmp_docchannel:docchannel,
  955. "tenderee":tenderee,
  956. "bidding_budget":bidding_budget,
  957. "product":product,
  958. }
  959. _dict.update(base_dict)
  960. _dict["page_time"] = [page_time,page_time]
  961. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  962. _rule = {"confidence":confidence,
  963. "query":_query,
  964. "singleNum_keys":singleNum_keys,
  965. "contain_keys":[],
  966. "multiNum_keys":[]}
  967. list_rules.append(_rule)
  968. if tenderee!="" and win_tenderer!="" and product!="":
  969. confidence=80
  970. _dict = {document_tmp_docchannel:docchannel,
  971. "tenderee":tenderee,
  972. "win_tenderer":win_tenderer,
  973. "product":product,
  974. }
  975. _dict.update(base_dict)
  976. _dict["page_time"] = [page_time,page_time]
  977. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  978. _rule = {"confidence":confidence,
  979. "query":_query,
  980. "singleNum_keys":singleNum_keys,
  981. "contain_keys":[],
  982. "multiNum_keys":[]}
  983. list_rules.append(_rule)
  984. if tenderee!="" and win_bid_price!="":
  985. confidence=80
  986. _dict = {document_tmp_docchannel:docchannel,
  987. "tenderee":tenderee,
  988. "win_bid_price":win_bid_price,
  989. "product":product,
  990. }
  991. _dict.update(base_dict)
  992. _dict["page_time"] = [page_time,page_time]
  993. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  994. _rule = {"confidence":confidence,
  995. "query":_query,
  996. "singleNum_keys":singleNum_keys,
  997. "contain_keys":[],
  998. "multiNum_keys":[]}
  999. list_rules.append(_rule)
  1000. if tenderee!="" and agency!="":
  1001. confidence=80
  1002. _dict = {document_tmp_docchannel:docchannel,
  1003. "tenderee":tenderee,
  1004. "agency":agency,
  1005. "product":product,
  1006. }
  1007. _dict.update(base_dict)
  1008. _dict["page_time"] = [page_time,page_time]
  1009. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1010. _rule = {"confidence":confidence,
  1011. "query":_query,
  1012. "singleNum_keys":singleNum_keys,
  1013. "contain_keys":[],
  1014. "multiNum_keys":[]}
  1015. list_rules.append(_rule)
  1016. if win_tenderer!="" and bidding_budget!="":
  1017. confidence=80
  1018. _dict = {document_tmp_docchannel:docchannel,
  1019. "win_tenderer":win_tenderer,
  1020. "bidding_budget":bidding_budget,
  1021. "product":product,
  1022. }
  1023. _dict.update(base_dict)
  1024. _dict["page_time"] = [page_time,page_time]
  1025. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1026. _rule = {"confidence":confidence,
  1027. "query":_query,
  1028. "singleNum_keys":singleNum_keys,
  1029. "contain_keys":[],
  1030. "multiNum_keys":[]}
  1031. list_rules.append(_rule)
  1032. if win_bid_price!="" and bidding_budget!="":
  1033. confidence=80
  1034. _dict = {document_tmp_docchannel:docchannel,
  1035. "win_bid_price":win_bid_price,
  1036. "bidding_budget":bidding_budget,
  1037. "product":product,
  1038. }
  1039. _dict.update(base_dict)
  1040. _dict["page_time"] = [page_time,page_time]
  1041. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1042. _rule = {"confidence":confidence,
  1043. "query":_query,
  1044. "singleNum_keys":singleNum_keys,
  1045. "contain_keys":[],
  1046. "multiNum_keys":[]}
  1047. list_rules.append(_rule)
  1048. if agency!="" and bidding_budget!="":
  1049. confidence=80
  1050. _dict = {document_tmp_docchannel:docchannel,
  1051. "agency":agency,
  1052. "bidding_budget":bidding_budget,
  1053. "product":product,
  1054. }
  1055. _dict.update(base_dict)
  1056. _dict["page_time"] = [page_time,page_time]
  1057. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1058. _rule = {"confidence":confidence,
  1059. "query":_query,
  1060. "singleNum_keys":singleNum_keys,
  1061. "contain_keys":[],
  1062. "multiNum_keys":[]}
  1063. list_rules.append(_rule)
  1064. if win_tenderer!="" and win_bid_price!="":
  1065. confidence=80
  1066. _dict = {document_tmp_docchannel:docchannel,
  1067. "win_tenderer":win_tenderer,
  1068. "win_bid_price":win_bid_price,
  1069. "product":product,
  1070. }
  1071. _dict.update(base_dict)
  1072. _dict["page_time"] = [page_time,page_time]
  1073. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1074. _rule = {"confidence":confidence,
  1075. "query":_query,
  1076. "singleNum_keys":singleNum_keys,
  1077. "contain_keys":[],
  1078. "multiNum_keys":[]}
  1079. list_rules.append(_rule)
  1080. if win_tenderer!="" and agency!="":
  1081. confidence=80
  1082. _dict = {document_tmp_docchannel:docchannel,
  1083. "win_tenderer":win_tenderer,
  1084. "agency":agency,
  1085. "product":product,
  1086. }
  1087. _dict.update(base_dict)
  1088. _dict["page_time"] = [page_time,page_time]
  1089. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1090. _rule = {"confidence":confidence,
  1091. "query":_query,
  1092. "singleNum_keys":singleNum_keys,
  1093. "contain_keys":[],
  1094. "multiNum_keys":[]}
  1095. list_rules.append(_rule)
  1096. if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
  1097. confidence=80
  1098. _dict = {document_tmp_docchannel:docchannel,
  1099. "doctitle_refine":doctitle_refine,
  1100. "product":product,
  1101. }
  1102. _dict.update(base_dict)
  1103. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  1104. _rule = {"confidence":confidence,
  1105. "query":_query,
  1106. "singleNum_keys":singleNum_keys,
  1107. "contain_keys":[],
  1108. "multiNum_keys":[]}
  1109. list_rules.append(_rule)
  1110. return list_rules
  1111. def dumplicate_fianl_check(self,base_list):
  1112. the_group = base_list
  1113. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1114. if len(the_group)>10:
  1115. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  1116. else:
  1117. keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
  1118. #置信度
  1119. list_key_index = []
  1120. for _k in keys:
  1121. if _k=="doctitle":
  1122. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1123. else:
  1124. list_key_index.append(getDiffIndex(the_group,_k))
  1125. _index = min(list_key_index)
  1126. if _index>1:
  1127. return the_group[:_index]
  1128. return []
  1129. def get_best_docid(self,base_list):
  1130. to_reverse = False
  1131. dict_source_count = {}
  1132. for _item in base_list:
  1133. _web_source = _item.get(document_tmp_web_source_no)
  1134. _fingerprint = _item.get(document_tmp_fingerprint)
  1135. if _web_source is not None:
  1136. if _web_source not in dict_source_count:
  1137. dict_source_count[_web_source] = set()
  1138. dict_source_count[_web_source].add(_fingerprint)
  1139. if len(dict_source_count[_web_source])>=2:
  1140. to_reverse=True
  1141. if len(base_list)>0:
  1142. base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
  1143. base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
  1144. base_list.sort(key=lambda x:x["extract_count"],reverse=True)
  1145. return base_list[0]["docid"]
  1146. def save_dumplicate(self,base_list,best_docid,status_from,status_to):
  1147. #best_docid need check while others can save directly
  1148. list_dict = []
  1149. for item in base_list:
  1150. docid = item["docid"]
  1151. _dict = {"partitionkey":item["partitionkey"],
  1152. "docid":item["docid"]}
  1153. if docid==best_docid:
  1154. if item.get("save",1)!=0:
  1155. _dict["save"] = 1
  1156. else:
  1157. _dict["save"] = 0
  1158. if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
  1159. _dict["status"] = random.randint(status_to[0],status_to[1])
  1160. list_dict.append(_dict)
  1161. for _dict in list_dict:
  1162. dtmp = Document_tmp(_dict)
  1163. dtmp.update_row(self.ots_client)
  1164. def flow_test(self,status_to=[1,10]):
  1165. def producer():
  1166. bool_query = BoolQuery(must_queries=[
  1167. # ExistsQuery("docid"),
  1168. # RangeQuery("crtime",range_to='2022-04-10'),
  1169. # RangeQuery("status",61),
  1170. NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1171. ],
  1172. must_not_queries=[
  1173. # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
  1174. TermQuery("attachment_extract_status",1),
  1175. RangeQuery("status",1,11)
  1176. ]
  1177. )
  1178. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1179. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1180. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1181. log("flow_init producer total_count:%d"%total_count)
  1182. list_dict = getRow_ots(rows)
  1183. for _dict in list_dict:
  1184. self.queue_init.put(_dict)
  1185. _count = len(list_dict)
  1186. while next_token and _count<1000000:
  1187. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1188. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1189. ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1190. list_dict = getRow_ots(rows)
  1191. for _dict in list_dict:
  1192. self.queue_init.put(_dict)
  1193. _count += len(list_dict)
  1194. print("%d/%d"%(_count,total_count))
  1195. def comsumer():
  1196. mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1197. mt.run()
  1198. def comsumer_handle(item,result_queue,ots_client):
  1199. # print(item)
  1200. dtmp = Document_tmp(item)
  1201. dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
  1202. dtmp.update_row(ots_client)
  1203. # dhtml = Document_html(item)
  1204. # dhtml.update_row(ots_client)
  1205. # dtmp.delete_row(ots_client)
  1206. # dhtml.delete_row(ots_client)
  1207. producer()
  1208. comsumer()
  1209. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  1210. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1211. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1212. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1213. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1214. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1215. log("flow_dumplicate producer total_count:%d"%total_count)
  1216. list_dict = getRow_ots(rows)
  1217. for _dict in list_dict:
  1218. self.queue_dumplicate.put(_dict)
  1219. _count = len(list_dict)
  1220. while next_token and _count<flow_process_count:
  1221. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1222. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1223. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1224. list_dict = getRow_ots(rows)
  1225. for _dict in list_dict:
  1226. self.queue_dumplicate.put(_dict)
  1227. _count += len(list_dict)
  1228. def comsumer():
  1229. mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1230. mt.run()
  1231. def comsumer_handle(item,result_queue,ots_client):
  1232. self.post_extract(item)
  1233. base_list = []
  1234. set_docid = set()
  1235. list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
  1236. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  1237. # print(item,"len_rules",len(list_rules))
  1238. for _rule in list_rules:
  1239. _query = _rule["query"]
  1240. confidence = _rule["confidence"]
  1241. singleNum_keys = _rule["singleNum_keys"]
  1242. contain_keys = _rule["contain_keys"]
  1243. multiNum_keys = _rule["multiNum_keys"]
  1244. self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
  1245. item["confidence"] = 999
  1246. if item.get(document_tmp_docid) not in set_docid:
  1247. base_list.append(item)
  1248. final_list = self.dumplicate_fianl_check(base_list)
  1249. best_docid = self.get_best_docid(final_list)
  1250. # log(str(final_list))
  1251. _d = {"partitionkey":item["partitionkey"],
  1252. "docid":item["docid"],
  1253. "status":random.randint(*flow_dumplicate_status_to),
  1254. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1255. }
  1256. dtmp = Document_tmp(_d)
  1257. dup_docid = set()
  1258. for _dict in final_list:
  1259. dup_docid.add(_dict.get(document_tmp_docid))
  1260. if item.get(document_tmp_docid) in dup_docid:
  1261. dup_docid.remove(item.get(document_tmp_docid))
  1262. if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
  1263. dtmp.setValue(document_tmp_save,1,True)
  1264. dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  1265. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1266. else:
  1267. dtmp.setValue(document_tmp_save,0,True)
  1268. if best_docid in dup_docid:
  1269. dup_docid.remove(best_docid)
  1270. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1271. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  1272. else:
  1273. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  1274. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  1275. dtmp.update_row(self.ots_client)
  1276. #只保留当前公告
  1277. # self.save_dumplicate(final_list,best_docid,status_from,status_to)
  1278. #
  1279. # print("=base=",item)
  1280. # if len(final_list)>=1:
  1281. # print("==================")
  1282. # for _dict in final_list:
  1283. # print(_dict)
  1284. # print("========>>>>>>>>>>")
  1285. producer()
  1286. comsumer()
  1287. def merge_document(self,item,status_to=None):
  1288. self.post_extract(item)
  1289. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  1290. _d = {"partitionkey":item["partitionkey"],
  1291. "docid":item["docid"],
  1292. }
  1293. dtmp = Document_tmp(_d)
  1294. if item.get(document_tmp_save,1)==1:
  1295. list_should_q = []
  1296. if project_code!="" and tenderee!="":
  1297. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1298. TermQuery("tenderee",tenderee)])
  1299. list_should_q.append(_q)
  1300. if project_name!="" and project_code!="":
  1301. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1302. TermQuery("project_name",project_name)])
  1303. list_should_q.append(_q)
  1304. if len(list_should_q)>0:
  1305. list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1306. if len(list_data)==1:
  1307. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1308. print(item["docid"],list_data[0]["uuid"])
  1309. else:
  1310. list_should_q = []
  1311. if bidding_budget!="" and project_code!="":
  1312. _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
  1313. TermQuery("bidding_budget",float(bidding_budget))])
  1314. list_should_q.append(_q)
  1315. if tenderee!="" and bidding_budget!="" and project_name!="":
  1316. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1317. TermQuery("bidding_budget",float(bidding_budget)),
  1318. TermQuery("project_name",project_name)])
  1319. list_should_q.append(_q)
  1320. if tenderee!="" and win_bid_price!="" and project_name!="":
  1321. _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
  1322. TermQuery("win_bid_price",float(win_bid_price)),
  1323. TermQuery("project_name",project_name)])
  1324. list_should_q.append(_q)
  1325. if len(list_should_q)>0:
  1326. list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
  1327. if len(list_data)==1:
  1328. dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
  1329. print(item["docid"],list_data[0]["uuid"])
  1330. return dtmp.getProperties().get("merge_uuid","")
  1331. # dtmp.update_row(self.ots_client)
  1332. def test_merge(self):
  1333. import pandas as pd
  1334. import queue
  1335. def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1336. list_test_item = []
  1337. should_q = BoolQuery(should_queries=[
  1338. TermQuery("docchannel",101),
  1339. TermQuery("docchannel",119),
  1340. TermQuery("docchannel",120)
  1341. ])
  1342. bool_query = BoolQuery(must_queries=[
  1343. TermQuery("page_time","2022-04-22"),
  1344. should_q,
  1345. TermQuery("save",1)
  1346. ])
  1347. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1348. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1349. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1350. log("flow_dumplicate producer total_count:%d"%total_count)
  1351. list_dict = getRow_ots(rows)
  1352. for _dict in list_dict:
  1353. list_test_item.append(_dict)
  1354. _count = len(list_dict)
  1355. while next_token:
  1356. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1357. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1358. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1359. list_dict = getRow_ots(rows)
  1360. for _dict in list_dict:
  1361. list_test_item.append(_dict)
  1362. _count += len(list_dict)
  1363. print("%d/%d"%(_count,total_count))
  1364. return list_test_item
  1365. from BaseDataMaintenance.model.ots.project import Project
  1366. def comsumer_handle(item,result_queue,ots_client):
  1367. item["merge_uuid"] = self.merge_document(item)
  1368. if item["merge_uuid"]!="":
  1369. _dict = {"uuid":item["merge_uuid"]}
  1370. _p = Project(_dict)
  1371. _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
  1372. if _p.getProperties().get("zhao_biao_page_time","")!="":
  1373. item["是否有招标"] = "是"
  1374. list_test_item = producer()
  1375. task_queue = queue.Queue()
  1376. for item in list_test_item:
  1377. task_queue.put(item)
  1378. mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
  1379. mt.run()
  1380. keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
  1381. df_data = {}
  1382. for k in keys:
  1383. df_data[k] = []
  1384. for item in list_test_item:
  1385. for k in keys:
  1386. df_data[k].append(item.get(k,""))
  1387. df = pd.DataFrame(df_data)
  1388. df.to_excel("test_merge.xlsx",columns=keys)
  1389. def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
  1390. def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
  1391. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
  1392. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1393. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1394. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1395. log("flow_merge producer total_count:%d"%total_count)
  1396. list_dict = getRow_ots(rows)
  1397. for _dict in list_dict:
  1398. self.queue_merge.put(_dict)
  1399. _count = len(list_dict)
  1400. while next_token and _count<process_count:
  1401. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1402. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1403. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1404. list_dict = getRow_ots(rows)
  1405. for _dict in list_dict:
  1406. self.queue_merge.put(_dict)
  1407. _count += len(list_dict)
  1408. def comsumer():
  1409. mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1410. mt.run()
  1411. def comsumer_handle(item,result_queue,ots_client):
  1412. self.merge_document(item,status_to)
  1413. # producer()
  1414. # comsumer()
  1415. pass
  1416. def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
  1417. pass
  1418. def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
  1419. def producer():
  1420. current_date = getCurrent_date("%Y-%m-%d")
  1421. tmp_date = timeAdd(current_date,-4)
  1422. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
  1423. RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
  1424. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1425. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  1426. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1427. log("flow_remove producer total_count:%d"%total_count)
  1428. list_dict = getRow_ots(rows)
  1429. for _dict in list_dict:
  1430. self.queue_remove.put(_dict)
  1431. _count = len(list_dict)
  1432. while next_token:
  1433. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1434. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1435. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1436. list_dict = getRow_ots(rows)
  1437. for _dict in list_dict:
  1438. self.queue_remove.put(_dict)
  1439. _count += len(list_dict)
  1440. def comsumer():
  1441. mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1442. mt.run()
  1443. def comsumer_handle(item,result_queue,ots_client):
  1444. dtmp = Document_tmp(item)
  1445. dtmp.delete_row(self.ots_client)
  1446. dhtml = Document_html(item)
  1447. dhtml.delete_row(self.ots_client)
  1448. producer()
  1449. comsumer()
  1450. def start_flow_dumplicate(self):
  1451. schedule = BlockingScheduler()
  1452. schedule.add_job(self.flow_remove,"cron",hour="20")
  1453. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  1454. schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
  1455. schedule.start()
  1456. def flow_remove_project_tmp(self,process_count=flow_process_count):
  1457. def producer():
  1458. current_date = getCurrent_date("%Y-%m-%d")
  1459. tmp_date = timeAdd(current_date,-6*31)
  1460. bool_query = BoolQuery(must_queries=[
  1461. RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
  1462. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
  1463. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
  1464. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1465. log("flow_remove project2_tmp producer total_count:%d"%total_count)
  1466. list_dict = getRow_ots(rows)
  1467. for _dict in list_dict:
  1468. self.queue_remove_project.put(_dict)
  1469. _count = len(list_dict)
  1470. while next_token:
  1471. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1472. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1473. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1474. list_dict = getRow_ots(rows)
  1475. for _dict in list_dict:
  1476. self.queue_remove_project.put(_dict)
  1477. _count += len(list_dict)
  1478. def comsumer():
  1479. mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
  1480. mt.run()
  1481. def comsumer_handle(item,result_queue,ots_client):
  1482. ptmp = Project_tmp(item)
  1483. ptmp.delete_row(self.ots_client)
  1484. producer()
  1485. comsumer()
  1486. def start_flow_merge(self):
  1487. schedule = BlockingScheduler()
  1488. schedule.add_job(self.flow_merge,"cron",second="*/10")
  1489. schedule.start()
  1490. def download_attachment():
  1491. ots_client = getConnect_ots()
  1492. queue_attachment = Queue()
  1493. auth = getAuth()
  1494. oss2.defaults.connection_pool_size = 100
  1495. oss2.defaults.multiget_num_threads = 20
  1496. attachment_bucket_name = "attachment-hub"
  1497. if is_internal:
  1498. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  1499. else:
  1500. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  1501. bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  1502. current_path = os.path.dirname(__file__)
  1503. def producer():
  1504. columns = [document_tmp_attachment_path]
  1505. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
  1506. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1507. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1508. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1509. log("flow_attachment producer total_count:%d"%total_count)
  1510. list_dict = getRow_ots(rows)
  1511. for _dict in list_dict:
  1512. queue_attachment.put(_dict)
  1513. _count = len(list_dict)
  1514. while next_token:
  1515. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1516. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1517. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1518. list_dict = getRow_ots(rows)
  1519. for _dict in list_dict:
  1520. queue_attachment.put(_dict)
  1521. _count += len(list_dict)
  1522. def comsumer():
  1523. mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
  1524. mt.run()
  1525. def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1526. list_attachment = []
  1527. rows_to_get = []
  1528. for _md5 in list_filemd5[:50]:
  1529. if _md5 is None:
  1530. continue
  1531. primary_key = [(attachment_filemd5,_md5)]
  1532. rows_to_get.append(primary_key)
  1533. req = BatchGetRowRequest()
  1534. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1535. try:
  1536. result = ots_client.batch_get_row(req)
  1537. attach_result = result.get_result_by_table(attachment_table_name)
  1538. for item in attach_result:
  1539. if item.is_ok:
  1540. _dict = getRow_ots_primary(item.row)
  1541. if _dict is not None:
  1542. list_attachment.append(attachment(_dict))
  1543. except Exception as e:
  1544. log(str(list_filemd5))
  1545. log("attachProcess comsumer error %s"%str(e))
  1546. return list_attachment
  1547. def comsumer_handle(item,result_queue):
  1548. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1549. if len(page_attachments)==0:
  1550. pass
  1551. else:
  1552. list_fileMd5 = []
  1553. for _atta in page_attachments:
  1554. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1555. list_attach = getAttachments(list_fileMd5)
  1556. for attach in list_attach:
  1557. filemd5 = attach.getProperties().get(attachment_filemd5)
  1558. _status = attach.getProperties().get(attachment_status)
  1559. _filetype = attach.getProperties().get(attachment_filetype)
  1560. _size = attach.getProperties().get(attachment_size)
  1561. _path = attach.getProperties().get(attachment_path)
  1562. _uuid = uuid4()
  1563. objectPath = attach.getProperties().get(attachment_path)
  1564. localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
  1565. try:
  1566. if _size>ATTACHMENT_LARGESIZE:
  1567. pass
  1568. else:
  1569. downloadFile(bucket,objectPath,localpath)
  1570. except Exception as e:
  1571. traceback.print_exc()
  1572. producer()
  1573. comsumer()
  1574. def test_attachment_interface():
  1575. current_path = os.path.dirname(__file__)
  1576. task_queue = Queue()
  1577. def producer():
  1578. _count = 0
  1579. list_filename = os.listdir(os.path.join(current_path,"download"))
  1580. for _filename in list_filename:
  1581. _count += 1
  1582. _type = _filename.split(".")[1]
  1583. task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
  1584. if _count>=500:
  1585. break
  1586. def comsumer():
  1587. mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
  1588. mt.run()
  1589. def comsumer_handle(item,result_queue):
  1590. _path = item.get("path")
  1591. _type = item.get("file_type")
  1592. _data_base64 = base64.b64encode(open(_path,"rb").read())
  1593. #调用接口处理结果
  1594. start_time = time.time()
  1595. _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
  1596. log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
  1597. producer()
  1598. comsumer()
  1599. class Dataflow_attachment(Dataflow):
  1600. def __init__(self):
  1601. Dataflow.__init__(self)
  1602. self.process_list_thread = []
  1603. def flow_attachment_process(self):
  1604. self.process_comsumer()
  1605. def monitor_attachment_process(self):
  1606. alive_count = 0
  1607. for _t in self.process_list_thread:
  1608. if _t.is_alive():
  1609. alive_count += 1
  1610. log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
  1611. def process_comsumer(self):
  1612. if len(self.process_list_thread)==0:
  1613. thread_count = 60
  1614. for i in range(thread_count):
  1615. self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
  1616. for t in self.process_list_thread:
  1617. t.start()
  1618. while 1:
  1619. failed_count = 0
  1620. for _i in range(len(self.process_list_thread)):
  1621. t = self.process_list_thread[_i]
  1622. if not t.is_alive():
  1623. failed_count += 1
  1624. self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
  1625. self.prcess_list_thread[_i].start()
  1626. if failed_count>0:
  1627. log("attachment failed %d"%(failed_count))
  1628. time.sleep(5)
  1629. def process_comsumer_handle(self):
  1630. while 1:
  1631. _flag = False
  1632. log("attachment handle:%s"%str(threading.get_ident()))
  1633. try:
  1634. item = self.queue_attachment_ocr.get(True,timeout=0.2)
  1635. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1636. self.attachment_recognize(item,None)
  1637. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1638. except Exception as e:
  1639. _flag = True
  1640. pass
  1641. try:
  1642. item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
  1643. log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
  1644. self.attachment_recognize(item,None)
  1645. log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
  1646. except Exception as e:
  1647. _flag = True and _flag
  1648. pass
  1649. if _flag:
  1650. time.sleep(2)
  1651. def attachment_recognize(self,_dict,result_queue):
  1652. item = _dict.get("item")
  1653. list_attach = _dict.get("list_attach")
  1654. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1655. "docid":item.get("docid")})
  1656. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1657. _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
  1658. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  1659. log(str(swf_urls))
  1660. if not _succeed:
  1661. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  1662. else:
  1663. dhtml.updateSWFImages(swf_urls)
  1664. dhtml.updateAttachment(list_html)
  1665. dhtml.update_row(self.ots_client)
  1666. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1667. item[document_tmp_attachment_extract_status] = 1
  1668. log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
  1669. dtmp = Document_tmp(item)
  1670. dtmp.update_row(self.ots_client)
  1671. def flow_attachment(self):
  1672. self.flow_attachment_producer()
  1673. self.flow_attachment_producer_comsumer()
  1674. def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
  1675. list_attachment = []
  1676. rows_to_get = []
  1677. for _md5 in list_filemd5[:50]:
  1678. if _md5 is None:
  1679. continue
  1680. primary_key = [(attachment_filemd5,_md5)]
  1681. rows_to_get.append(primary_key)
  1682. req = BatchGetRowRequest()
  1683. req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
  1684. try:
  1685. result = self.ots_client.batch_get_row(req)
  1686. attach_result = result.get_result_by_table(attachment_table_name)
  1687. for item in attach_result:
  1688. if item.is_ok:
  1689. _dict = getRow_ots_primary(item.row)
  1690. if _dict is not None:
  1691. list_attachment.append(attachment(_dict))
  1692. except Exception as e:
  1693. log(str(list_filemd5))
  1694. log("attachProcess comsumer error %s"%str(e))
  1695. return list_attachment
  1696. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  1697. qsize_ocr = self.queue_attachment_ocr.qsize()
  1698. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  1699. log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
  1700. #选择加入数据场景
  1701. if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
  1702. return
  1703. #去重
  1704. set_docid = set()
  1705. set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
  1706. if qsize_ocr>0:
  1707. self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
  1708. else:
  1709. self.list_attachment_ocr = []
  1710. if qsize_not_ocr>0:
  1711. self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
  1712. else:
  1713. self.list_attachment_not_ocr = []
  1714. try:
  1715. bool_query = BoolQuery(must_queries=[
  1716. RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
  1717. # TermQuery(document_tmp_docid,234925191),
  1718. ])
  1719. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1720. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
  1721. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1722. log("flow_attachment producer total_count:%d"%total_count)
  1723. list_dict = getRow_ots(rows)
  1724. _count = 0
  1725. for _dict in list_dict:
  1726. docid = _dict.get(document_tmp_docid)
  1727. if docid in set_docid:
  1728. continue
  1729. self.queue_attachment.put(_dict,True)
  1730. _count += 1
  1731. while next_token and _count<flow_process_count:
  1732. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1733. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1734. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1735. list_dict = getRow_ots(rows)
  1736. for _dict in list_dict:
  1737. docid = _dict.get(document_tmp_docid)
  1738. if docid in set_docid:
  1739. continue
  1740. self.queue_attachment.put(_dict,True)
  1741. _count += 1
  1742. log("add attachment count:%d"%(_count))
  1743. except Exception as e:
  1744. log("flow attachment producer error:%s"%(str(e)))
  1745. traceback.print_exc()
  1746. def flow_attachment_producer_comsumer(self):
  1747. log("start flow_attachment comsumer")
  1748. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  1749. mt.run()
  1750. def set_queue(self,_dict):
  1751. list_attach = _dict.get("list_attach")
  1752. to_ocr = False
  1753. for attach in list_attach:
  1754. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  1755. to_ocr = True
  1756. break
  1757. if to_ocr:
  1758. self.queue_attachment_ocr.put(_dict,True)
  1759. # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
  1760. else:
  1761. self.queue_attachment_not_ocr.put(_dict,True)
  1762. # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
  1763. def comsumer_handle(self,item,result_queue):
  1764. try:
  1765. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  1766. if len(page_attachments)==0:
  1767. item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
  1768. dtmp = Document_tmp(item)
  1769. dtmp.update_row(self.ots_client)
  1770. else:
  1771. list_fileMd5 = []
  1772. for _atta in page_attachments:
  1773. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  1774. list_attach = self.getAttachments(list_fileMd5)
  1775. #未上传成功的2小时内不处理
  1776. if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
  1777. item[document_tmp_status] = 1
  1778. dtmp = Document_tmp(item)
  1779. dtmp.update_row(self.ots_client)
  1780. return
  1781. self.set_queue({"item":item,"list_attach":list_attach})
  1782. except Exception as e:
  1783. traceback.print_exc()
  1784. def start_flow_attachment(self):
  1785. schedule = BlockingScheduler()
  1786. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  1787. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  1788. schedule.start()
  1789. class Dataflow_extract(Dataflow):
  1790. def __init__(self):
  1791. Dataflow.__init__(self)
  1792. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  1793. q_size = self.queue_extract.qsize()
  1794. if q_size>100:
  1795. return
  1796. set_docid = set(self.list_extract)
  1797. if q_size>0:
  1798. self.list_extract = self.list_extract[-q_size:]
  1799. else:
  1800. self.list_extract = []
  1801. try:
  1802. bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
  1803. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1804. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
  1805. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1806. log("flow_extract producer total_count:%d"%total_count)
  1807. list_dict = getRow_ots(rows)
  1808. for _dict in list_dict:
  1809. docid = _dict.get(document_tmp_docid)
  1810. if docid in set_docid:
  1811. self.list_extract.insert(0,docid)
  1812. continue
  1813. else:
  1814. self.queue_extract.put(_dict)
  1815. self.list_extract.append(docid)
  1816. _count = len(list_dict)
  1817. while next_token and _count<flow_process_count:
  1818. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1819. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1820. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1821. list_dict = getRow_ots(rows)
  1822. for _dict in list_dict:
  1823. docid = _dict.get(document_tmp_docid)
  1824. if docid in set_docid:
  1825. self.list_extract.insert(0,docid)
  1826. continue
  1827. else:
  1828. self.queue_extract.put(_dict)
  1829. self.list_extract.append(docid)
  1830. _count += len(list_dict)
  1831. except Exception as e:
  1832. log("flow extract producer error:%s"%(str(e)))
  1833. traceback.print_exc()
  1834. def flow_extract(self,):
  1835. self.comsumer()
  1836. def comsumer(self):
  1837. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
  1838. mt.run()
  1839. def comsumer_handle(self,item,result_queue):
  1840. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1841. "docid":item.get("docid")})
  1842. dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
  1843. item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
  1844. _extract = Document_extract({})
  1845. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1846. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1847. all_done = 1
  1848. if all_done:
  1849. data = item
  1850. resp = requests.post(self.other_url,json=data,headers=self.header)
  1851. if (resp.status_code >=200 and resp.status_code<=210):
  1852. _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
  1853. else:
  1854. all_done = -1
  1855. data = {}
  1856. for k,v in item.items():
  1857. data[k] = v
  1858. data["timeout"] = 240
  1859. data["doc_id"] = data.get(document_tmp_docid)
  1860. data["content"] = data.get(document_tmp_dochtmlcon,"")
  1861. if document_tmp_dochtmlcon in data:
  1862. data.pop(document_tmp_dochtmlcon)
  1863. data["title"] = data.get(document_tmp_doctitle,"")
  1864. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  1865. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  1866. if all_done:
  1867. resp = requests.post(self.extract_url,json=data,headers=self.header)
  1868. if (resp.status_code >=200 and resp.status_code<=210):
  1869. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  1870. else:
  1871. all_done = -2
  1872. if all_done:
  1873. resp = requests.post(self.industy_url,json=data,headers=self.header)
  1874. if (resp.status_code >=200 and resp.status_code<=210):
  1875. _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1876. else:
  1877. all_done = -3
  1878. _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
  1879. document_docid:item.get(document_tmp_docid),
  1880. }
  1881. dtmp = Document_tmp(_dict)
  1882. if all_done!=1:
  1883. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1884. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
  1885. dtmp.update_row(self.ots_client)
  1886. else:
  1887. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1888. dtmp.update_row(self.ots_client)
  1889. # 插入接口表,上线放开
  1890. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1891. _extract.update_row(self.ots_client)
  1892. log("process docid:%d %s"%(data["doc_id"],str(all_done)))
  1893. def start_flow_extract(self):
  1894. schedule = BlockingScheduler()
  1895. schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
  1896. schedule.add_job(self.flow_extract,"cron",second="*/10")
  1897. schedule.start()
  1898. class Dataflow_dumplicate(Dataflow):
  1899. class DeleteListener():
  1900. def __init__(self,conn,_func,*args,**kwargs):
  1901. self.conn = conn
  1902. self._func = _func
  1903. def on_error(self, headers,*args,**kwargs):
  1904. log('received an error %s' % str(headers.body))
  1905. def on_message(self, headers,*args,**kwargs):
  1906. try:
  1907. message_id = headers.headers["message-id"]
  1908. body = headers.body
  1909. log("get message %s"%(message_id))
  1910. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  1911. except Exception as e:
  1912. traceback.print_exc()
  1913. pass
  1914. def __del__(self):
  1915. self.conn.disconnect()
  1916. def __init__(self,start_delete_listener=True):
  1917. Dataflow.__init__(self,)
  1918. self.c_f_get_extractCount = f_get_extractCount()
  1919. self.c_f_get_package = f_get_package()
  1920. logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1921. self.fix_doc_docid = None
  1922. self.bdm = BaseDataMonitor()
  1923. if start_delete_listener:
  1924. self.delete_comsumer_counts = 2
  1925. self.doc_delete_queue = "/queue/doc_delete_queue"
  1926. self.doc_delete_result = "/queue/doc_delete_result"
  1927. self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
  1928. for _ in range(self.delete_comsumer_counts):
  1929. conn = getConnect_activateMQ_ali()
  1930. listener = self.DeleteListener(conn,self.delete_doc_handle)
  1931. createComsumer(listener,self.doc_delete_queue)
  1932. def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
  1933. dict_time = {}
  1934. for k in keys:
  1935. dict_time[k] = _extract.get(k)
  1936. return dict_time
  1937. def post_extract(self,_dict):
  1938. win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
  1939. _dict["win_tenderer"] = win_tenderer
  1940. _dict["bidding_budget"] = bidding_budget
  1941. _dict["win_bid_price"] = win_bid_price
  1942. extract_json = _dict.get(document_tmp_extract_json,"{}")
  1943. _extract = json.loads(extract_json)
  1944. _dict["product"] = ",".join(_extract.get("product",[]))
  1945. _dict["fingerprint"] = _extract.get("fingerprint","")
  1946. _dict["project_codes"] = _extract.get("code",[])
  1947. if len(_dict["project_codes"])>0:
  1948. _dict["project_code"] = _dict["project_codes"][0]
  1949. else:
  1950. _dict["project_code"] = ""
  1951. _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
  1952. if _dict["doctitle_refine"]=="":
  1953. _dict["doctitle_refine"] = _dict.get("doctitle")
  1954. _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
  1955. "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
  1956. _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
  1957. _dict["package"] = self.c_f_get_package.evaluate(extract_json)
  1958. _dict["project_name"] = _extract.get("name","")
  1959. _dict["dict_time"] = self.get_dict_time(_extract)
  1960. def dumplicate_fianl_check(self,base_list,b_log=False):
  1961. the_group = base_list
  1962. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1963. _index = 0
  1964. base_fingerprint = "None"
  1965. if len(base_list)>0:
  1966. base_fingerprint = base_list[0]["fingerprint"]
  1967. for _i in range(1,len(base_list)):
  1968. _dict1 = base_list[_i]
  1969. fingerprint_less = _dict1["fingerprint"]
  1970. _pass = True
  1971. if fingerprint_less==base_fingerprint:
  1972. _index = _i
  1973. continue
  1974. for _j in range(min(_i,10)):
  1975. _dict2 = base_list[_j]
  1976. _prob = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
  1977. print("_prob:",_prob)
  1978. if _prob<=0.1:
  1979. _pass = False
  1980. break
  1981. log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
  1982. _index = _i
  1983. if not _pass:
  1984. _index -= 1
  1985. break
  1986. if _index>=1:
  1987. # #对重复入库的进行去重
  1988. # _l = the_group[:_index+1]
  1989. # set_fingerprint = set()
  1990. # final_l = []
  1991. # for _dict in _l:
  1992. # fingerprint_less = _dict["fingerprint"]
  1993. # if fingerprint_less in set_fingerprint:
  1994. # continue
  1995. # else:
  1996. # final_l.append(_dict)
  1997. # set_fingerprint.add(fingerprint_less)
  1998. return the_group[:_index+1]
  1999. return []
  2000. def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
  2001. document_less = _dict1
  2002. docid_less = _dict1["docid"]
  2003. docchannel_less = document_less["docchannel"]
  2004. page_time_less = document_less["page_time"]
  2005. doctitle_refine_less = document_less["doctitle_refine"]
  2006. project_codes_less = document_less["project_codes"]
  2007. nlp_enterprise_less = document_less["nlp_enterprise"]
  2008. tenderee_less = document_less["tenderee"]
  2009. agency_less = document_less["agency"]
  2010. win_tenderer_less = document_less["win_tenderer"]
  2011. bidding_budget_less = document_less["bidding_budget"]
  2012. win_bid_price_less = document_less["win_bid_price"]
  2013. product_less = document_less["product"]
  2014. package_less = document_less["package"]
  2015. json_time_less = document_less["dict_time"]
  2016. project_name_less = document_less["project_name"]
  2017. fingerprint_less = document_less["fingerprint"]
  2018. extract_count_less = document_less["extract_count"]
  2019. web_source_no_less = document_less.get("web_source_no")
  2020. province_less = document_less.get("province")
  2021. city_less = document_less.get("city")
  2022. district_less = document_less.get("district")
  2023. document_greater = _dict2
  2024. docid_greater = _dict2["docid"]
  2025. page_time_greater = document_greater["page_time"]
  2026. docchannel_greater = document_greater["docchannel"]
  2027. doctitle_refine_greater = document_greater["doctitle_refine"]
  2028. project_codes_greater = document_greater["project_codes"]
  2029. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2030. tenderee_greater = document_greater["tenderee"]
  2031. agency_greater = document_greater["agency"]
  2032. win_tenderer_greater = document_greater["win_tenderer"]
  2033. bidding_budget_greater = document_greater["bidding_budget"]
  2034. win_bid_price_greater = document_greater["win_bid_price"]
  2035. product_greater = document_greater["product"]
  2036. package_greater = document_greater["package"]
  2037. json_time_greater = document_greater["dict_time"]
  2038. project_name_greater = document_greater["project_name"]
  2039. fingerprint_greater = document_greater["fingerprint"]
  2040. extract_count_greater = document_greater["extract_count"]
  2041. web_source_no_greater = document_greater.get("web_source_no")
  2042. province_greater = document_greater.get("province")
  2043. city_greater = document_greater.get("city")
  2044. district_greater = document_greater.get("district")
  2045. hard_level=1
  2046. if web_source_no_less==web_source_no_greater=="17397-3":
  2047. hard_level=2
  2048. return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
  2049. def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
  2050. document_less = _dict1
  2051. docid_less = _dict1["docid"]
  2052. docchannel_less = document_less["docchannel"]
  2053. page_time_less = document_less["page_time"]
  2054. doctitle_refine_less = document_less["doctitle_refine"]
  2055. project_codes_less = document_less["project_codes"]
  2056. nlp_enterprise_less = document_less["nlp_enterprise"]
  2057. tenderee_less = document_less["tenderee"]
  2058. agency_less = document_less["agency"]
  2059. win_tenderer_less = document_less["win_tenderer"]
  2060. bidding_budget_less = document_less["bidding_budget"]
  2061. win_bid_price_less = document_less["win_bid_price"]
  2062. product_less = document_less["product"]
  2063. package_less = document_less["package"]
  2064. json_time_less = document_less["dict_time"]
  2065. project_name_less = document_less["project_name"]
  2066. fingerprint_less = document_less["fingerprint"]
  2067. extract_count_less = document_less["extract_count"]
  2068. document_greater = _dict2
  2069. docid_greater = _dict2["docid"]
  2070. page_time_greater = document_greater["page_time"]
  2071. doctitle_refine_greater = document_greater["doctitle_refine"]
  2072. project_codes_greater = document_greater["project_codes"]
  2073. nlp_enterprise_greater = document_greater["nlp_enterprise"]
  2074. tenderee_greater = document_greater["tenderee"]
  2075. agency_greater = document_greater["agency"]
  2076. win_tenderer_greater = document_greater["win_tenderer"]
  2077. bidding_budget_greater = document_greater["bidding_budget"]
  2078. win_bid_price_greater = document_greater["win_bid_price"]
  2079. product_greater = document_greater["product"]
  2080. package_greater = document_greater["package"]
  2081. json_time_greater = document_greater["dict_time"]
  2082. project_name_greater = document_greater["project_name"]
  2083. fingerprint_greater = document_greater["fingerprint"]
  2084. extract_count_greater = document_greater["extract_count"]
  2085. if fingerprint_less==fingerprint_greater:
  2086. return 1
  2087. same_count = 0
  2088. all_count = 8
  2089. if len(set(project_codes_less) & set(project_codes_greater))>0:
  2090. same_count += 1
  2091. if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
  2092. same_count += 1
  2093. if getLength(agency_less)>0 and agency_less==agency_greater:
  2094. same_count += 1
  2095. if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
  2096. same_count += 1
  2097. if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
  2098. same_count += 1
  2099. if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
  2100. same_count += 1
  2101. if getLength(project_name_less)>0 and project_name_less==project_name_greater:
  2102. same_count += 1
  2103. if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
  2104. same_count += 1
  2105. base_prob = 0
  2106. if min_counts<3:
  2107. base_prob = 0.9
  2108. elif min_counts<5:
  2109. base_prob = 0.8
  2110. elif min_counts<8:
  2111. base_prob = 0.7
  2112. else:
  2113. base_prob = 0.6
  2114. _prob = base_prob*same_count/all_count
  2115. if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
  2116. _prob = 0.15
  2117. if _prob<0.1:
  2118. return _prob
  2119. check_result = {"pass":1}
  2120. if docchannel_less in (51,102,103,104,115,116,117):
  2121. if doctitle_refine_less!=doctitle_refine_greater:
  2122. if page_time_less!=page_time_greater:
  2123. check_result["docchannel"] = 0
  2124. check_result["pass"] = 0
  2125. else:
  2126. check_result["docchannel"] = 2
  2127. if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
  2128. check_result["doctitle"] = 0
  2129. check_result["pass"] = 0
  2130. if b_log:
  2131. logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
  2132. else:
  2133. check_result["doctitle"] = 2
  2134. #added check
  2135. if not check_codes(project_codes_less,project_codes_greater):
  2136. check_result["code"] = 0
  2137. check_result["pass"] = 0
  2138. if b_log:
  2139. logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
  2140. else:
  2141. if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
  2142. check_result["code"] = 2
  2143. else:
  2144. check_result["code"] = 1
  2145. if not check_product(product_less,product_greater):
  2146. check_result["product"] = 0
  2147. check_result["pass"] = 0
  2148. if b_log:
  2149. logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
  2150. else:
  2151. if getLength(product_less)>0 and getLength(product_greater)>0:
  2152. check_result["product"] = 2
  2153. else:
  2154. check_result["product"] = 1
  2155. if not check_demand():
  2156. check_result["pass"] = 0
  2157. if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
  2158. tenderee_less,tenderee_greater,
  2159. agency_less,agency_greater,
  2160. win_tenderer_less,win_tenderer_greater):
  2161. check_result["entity"] = 0
  2162. check_result["pass"] = 0
  2163. if b_log:
  2164. logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
  2165. else:
  2166. if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
  2167. check_result["entity"] = 2
  2168. elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
  2169. check_result["entity"] = 2
  2170. else:
  2171. check_result["entity"] = 1
  2172. if not check_money(bidding_budget_less,bidding_budget_greater,
  2173. win_bid_price_less,win_bid_price_greater):
  2174. if b_log:
  2175. logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
  2176. check_result["money"] = 0
  2177. check_result["pass"] = 0
  2178. else:
  2179. if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
  2180. check_result["money"] = 2
  2181. elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
  2182. check_result["money"] = 2
  2183. else:
  2184. check_result["money"] = 1
  2185. #added check
  2186. if not check_package(package_less,package_greater):
  2187. if b_log:
  2188. logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
  2189. check_result["package"] = 0
  2190. check_result["pass"] = 0
  2191. else:
  2192. if getLength(package_less)>0 and getLength(package_greater)>0:
  2193. check_result["package"] = 2
  2194. else:
  2195. check_result["package"] = 1
  2196. #added check
  2197. if not check_time(json_time_less,json_time_greater):
  2198. if b_log:
  2199. logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
  2200. if isinstance(json_time_less,dict):
  2201. time_less = json_time_less
  2202. else:
  2203. time_less = json.loads(json_time_less)
  2204. if isinstance(json_time_greater,dict):
  2205. time_greater = json_time_greater
  2206. else:
  2207. time_greater = json.loads(json_time_greater)
  2208. for k,v in time_less.items():
  2209. if getLength(v)>0:
  2210. v1 = time_greater.get(k,"")
  2211. if getLength(v1)>0:
  2212. if v!=v1:
  2213. log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
  2214. check_result["time"] = 0
  2215. check_result["pass"] = 0
  2216. else:
  2217. if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
  2218. check_result["time"] = 2
  2219. else:
  2220. check_result["time"] = 1
  2221. if check_result.get("pass",0)==0:
  2222. if b_log:
  2223. logging.info(str(check_result))
  2224. if check_result.get("money",1)==0:
  2225. return 0
  2226. if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
  2227. return _prob
  2228. else:
  2229. return 0
  2230. if check_result.get("time",1)==0:
  2231. return 0
  2232. return _prob
  2233. def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2234. for _ in range(retry_times):
  2235. try:
  2236. _time = time.time()
  2237. check_time = 0
  2238. if isinstance(_query,list):
  2239. bool_query = BoolQuery(should_queries=_query)
  2240. else:
  2241. bool_query = _query
  2242. rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
  2243. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
  2244. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2245. list_dict = getRow_ots(rows)
  2246. list_data = []
  2247. for _dict in list_dict:
  2248. self.post_extract(_dict)
  2249. _docid = _dict.get(document_tmp_docid)
  2250. if merge:
  2251. list_data.append(_dict)
  2252. else:
  2253. if _docid!=item.get(document_tmp_docid):
  2254. _time1 = time.time()
  2255. confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
  2256. check_time+= time.time()-_time1
  2257. _dict["confidence"] = confidence
  2258. _dict["min_counts"] = total_count
  2259. if not confidence<0.1:
  2260. list_data.append(_dict)
  2261. all_time = time.time()-_time
  2262. # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
  2263. return list_data
  2264. except Exception as e:
  2265. traceback.print_exc()
  2266. return []
  2267. def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
  2268. list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
  2269. for _dict in list_dict:
  2270. _docid = _dict.get(document_tmp_docid)
  2271. confidence = _dict["confidence"]
  2272. print("confidence",_docid,confidence)
  2273. if confidence>0.1:
  2274. if _docid not in set_docid:
  2275. base_list.append(_dict)
  2276. set_docid.add(_docid)
  2277. set_docid.add(_docid)
  2278. def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
  2279. for k,v in _dict.items():
  2280. if getLength(v)==0:
  2281. return
  2282. _dict.update(base_dict)
  2283. if b_log:
  2284. log(str(_dict))
  2285. _query = self.generate_dumplicate_query(_dict,must_not_dict)
  2286. _rule = {"confidence":confidence,
  2287. "item":item,
  2288. "query":_query,
  2289. "singleNum_keys":[],
  2290. "contain_keys":[],
  2291. "multiNum_keys":[]}
  2292. list_rules.append(_rule)
  2293. def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
  2294. docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
  2295. current_date = getCurrent_date("%Y-%m-%d")
  2296. if page_time=='':
  2297. page_time = current_date
  2298. if page_time>=timeAdd(current_date,-2):
  2299. table_name = "document_tmp"
  2300. table_index = "document_tmp_index"
  2301. base_dict = {
  2302. "docchannel":item.get("docchannel",52),
  2303. "status":[status_from[0]],
  2304. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2305. }
  2306. must_not_dict = {"save":0,"docid":item.get("docid")}
  2307. doctitle_refine_name = "doctitle_refine"
  2308. else:
  2309. table_name = "document"
  2310. table_index = "document_index"
  2311. if get_all:
  2312. _status = [201,450]
  2313. else:
  2314. _status = [201,300]
  2315. base_dict = {
  2316. "docchannel":item["docchannel"],
  2317. "status":_status,
  2318. "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
  2319. }
  2320. must_not_dict = {"docid":item.get("docid")}
  2321. doctitle_refine_name = "doctitle"
  2322. list_rules = []
  2323. singleNum_keys = ["tenderee","win_tenderer"]
  2324. confidence = 100
  2325. self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
  2326. confidence = 90
  2327. _dict = {document_tmp_agency:agency,
  2328. "win_tenderer":win_tenderer,
  2329. "win_bid_price":win_bid_price}
  2330. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2331. _dict = {document_tmp_agency:agency,
  2332. "win_tenderer":win_tenderer,
  2333. "bidding_budget":bidding_budget}
  2334. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2335. _dict = {document_tmp_agency:agency,
  2336. "win_bid_price":win_bid_price,
  2337. "bidding_budget":bidding_budget}
  2338. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2339. _dict = {win_tenderer:win_tenderer,
  2340. "win_bid_price":win_bid_price,
  2341. "bidding_budget":bidding_budget}
  2342. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2343. _dict = {"tenderee":tenderee,
  2344. "win_tenderer":win_tenderer,
  2345. "win_bid_price":win_bid_price}
  2346. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2347. _dict = {"tenderee":tenderee,
  2348. "win_tenderer":win_tenderer,
  2349. "bidding_budget":bidding_budget}
  2350. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2351. _dict = {"tenderee":tenderee,
  2352. "win_bid_price":win_bid_price,
  2353. "bidding_budget":bidding_budget}
  2354. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2355. _dict = {"tenderee":tenderee,
  2356. "agency":agency,
  2357. "win_tenderer":win_tenderer}
  2358. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2359. _dict = {"tenderee":tenderee,
  2360. "agency":agency,
  2361. "win_bid_price":win_bid_price}
  2362. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2363. _dict = {"tenderee":tenderee,
  2364. "agency":agency,
  2365. "bidding_budget":bidding_budget}
  2366. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2367. confidence=85
  2368. _dict = {"tenderee":tenderee,
  2369. "agency":agency
  2370. }
  2371. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2372. _dict = {"tenderee":tenderee,
  2373. "project_codes":project_code
  2374. }
  2375. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2376. _dict = {"tenderee":tenderee,
  2377. "project_name":project_name
  2378. }
  2379. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2380. if getLength(product)>0:
  2381. l_p = product.split(",")
  2382. _dict = {"tenderee":tenderee,
  2383. "product":l_p[0]
  2384. }
  2385. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2386. _dict = {"tenderee":tenderee,
  2387. "win_tenderer":win_tenderer
  2388. }
  2389. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2390. _dict = {"tenderee":tenderee,
  2391. "win_bid_price":win_bid_price
  2392. }
  2393. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2394. _dict = {"tenderee":tenderee,
  2395. "bidding_budget":bidding_budget
  2396. }
  2397. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2398. _dict = {"tenderee":tenderee,
  2399. doctitle_refine_name:doctitle_refine
  2400. }
  2401. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2402. _dict = {"agency":agency,
  2403. "project_codes":project_code
  2404. }
  2405. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2406. _dict = {"agency":agency,
  2407. "project_name":project_name
  2408. }
  2409. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2410. _dict = {"project_codes":project_code,
  2411. "project_name":project_name
  2412. }
  2413. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2414. _dict = {"project_codes":project_code,
  2415. "win_tenderer":win_tenderer
  2416. }
  2417. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2418. _dict = {"project_codes":project_code,
  2419. "win_bid_price":win_bid_price
  2420. }
  2421. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2422. _dict = {"project_codes":project_code,
  2423. "bidding_budget":bidding_budget
  2424. }
  2425. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2426. _dict = {"project_codes":project_code,
  2427. doctitle_refine_name:doctitle_refine
  2428. }
  2429. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2430. _dict = {"project_name":project_name,
  2431. "win_tenderer":win_tenderer
  2432. }
  2433. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2434. _dict = {"project_name":project_name,
  2435. "win_bid_price":win_bid_price
  2436. }
  2437. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2438. _dict = {"project_name":project_name,
  2439. "bidding_budget":bidding_budget
  2440. }
  2441. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2442. _dict = {"project_name":project_name,
  2443. doctitle_refine_name:doctitle_refine
  2444. }
  2445. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2446. _dict = {"win_tenderer":win_tenderer,
  2447. "win_bid_price":win_bid_price
  2448. }
  2449. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2450. _dict = {"win_tenderer":win_tenderer,
  2451. "bidding_budget":bidding_budget
  2452. }
  2453. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2454. _dict = {"win_tenderer":win_tenderer,
  2455. doctitle_refine_name:doctitle_refine
  2456. }
  2457. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2458. _dict = {"win_bid_price":win_bid_price,
  2459. "bidding_budget":bidding_budget
  2460. }
  2461. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2462. _dict = {"win_bid_price":win_bid_price,
  2463. doctitle_refine_name:doctitle_refine
  2464. }
  2465. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2466. _dict = {"bidding_budget":bidding_budget,
  2467. doctitle_refine_name:doctitle_refine
  2468. }
  2469. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2470. confidence=80
  2471. _dict = {doctitle_refine_name:doctitle_refine}
  2472. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2473. _dict = {"project_codes":project_code}
  2474. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2475. confidence=70
  2476. _dict = {"project_name":project_name}
  2477. self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
  2478. return list_rules,table_name,table_index
  2479. def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]):
  2480. q_size = self.queue_dumplicate.qsize()
  2481. log("dumplicate queue size %d"%(q_size))
  2482. if q_size>process_count//3:
  2483. return
  2484. bool_query = BoolQuery(must_queries=[
  2485. RangeQuery(document_tmp_status,*status_from,True,True),
  2486. # TermQuery("docid",271983871)
  2487. ])
  2488. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2489. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  2490. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2491. log("flow_dumplicate producer total_count:%d"%total_count)
  2492. list_dict = getRow_ots(rows)
  2493. for _dict in list_dict:
  2494. docid = _dict.get(document_tmp_docid)
  2495. if docid in self.dumplicate_set:
  2496. continue
  2497. self.dumplicate_set.add(docid)
  2498. self.queue_dumplicate.put(_dict)
  2499. _count = len(list_dict)
  2500. while next_token and _count<process_count:
  2501. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2502. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  2503. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  2504. list_dict = getRow_ots(rows)
  2505. for _dict in list_dict:
  2506. docid = _dict.get(document_tmp_docid)
  2507. if docid in self.dumplicate_set:
  2508. continue
  2509. self.dumplicate_set.add(docid)
  2510. self.queue_dumplicate.put(_dict)
  2511. _count += len(list_dict)
  2512. _l = list(self.dumplicate_set)
  2513. _l.sort(key=lambda x:x,reverse=True)
  2514. self.dumplicate_set = set(_l[:flow_process_count*2])
  2515. def comsumer_flow_dumplicate(self):
  2516. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
  2517. mt.run()
  2518. def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
  2519. self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
  2520. # self.comsumer_flow_dumplicate()
  2521. def flow_dumpcate_comsumer(self):
  2522. from multiprocessing import Process
  2523. process_count = 3
  2524. thread_count = 20
  2525. list_process = []
  2526. def start_thread():
  2527. mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
  2528. mt.run()
  2529. for _ in range(process_count):
  2530. p = Process(target=start_thread)
  2531. list_process.append(p)
  2532. for p in list_process:
  2533. p.start()
  2534. while 1:
  2535. for _i in range(len(list_process)):
  2536. p = list_process[_i]
  2537. if not p.is_alive():
  2538. p = Process(target=start_thread)
  2539. list_process[_i] = p
  2540. p.start()
  2541. time.sleep(1)
  2542. # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
  2543. # mt.run()
  2544. def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
  2545. '''
  2546. 根据docid查询公告内容,先查询document_tmp,再查询document
  2547. :param list_docids:
  2548. :return:
  2549. '''
  2550. list_docs = []
  2551. set_fingerprint = set()
  2552. for _docid in list_docids:
  2553. docid = int(_docid)
  2554. _dict = {document_partitionkey:getPartitionKey(docid),
  2555. document_docid:docid}
  2556. _doc = Document_tmp(_dict)
  2557. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2558. if not _exists:
  2559. _doc = Document(_dict)
  2560. _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
  2561. if _exists:
  2562. _fingerprint = _doc.getProperties().get(document_fingerprint)
  2563. if _fingerprint in set_fingerprint:
  2564. continue
  2565. set_fingerprint.add(_fingerprint)
  2566. list_docs.append(_doc)
  2567. for _doc in list_docs:
  2568. try:
  2569. _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
  2570. if _sub_docs_json is not None:
  2571. _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
  2572. except Exception as e:
  2573. traceback.print_exc()
  2574. list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
  2575. return list_docs
  2576. def is_same_package(self,_dict1,_dict2):
  2577. sub_project_name1 = _dict1.get(project_sub_project_name,"")
  2578. if sub_project_name1=="Project":
  2579. sub_project_name1 = ""
  2580. win_tenderer1 = _dict1.get(project_win_tenderer,"")
  2581. win_bid_price1 = _dict1.get(project_win_bid_price,0)
  2582. bidding_budget1 = _dict1.get(project_bidding_budget,0)
  2583. sub_project_name2 = _dict2.get(project_sub_project_name,"")
  2584. if sub_project_name2=="Project":
  2585. sub_project_name2 = ""
  2586. win_tenderer2 = _dict2.get(project_win_tenderer,"")
  2587. win_bid_price2 = _dict2.get(project_win_bid_price,0)
  2588. bidding_budget2 = _dict2.get(project_bidding_budget,0)
  2589. _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
  2590. if len(_set)>1:
  2591. return False
  2592. _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
  2593. if len(_set)>1:
  2594. return False
  2595. _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
  2596. if len(_set)>1:
  2597. return False
  2598. _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
  2599. if len(_set)>1:
  2600. return False
  2601. return True
  2602. def getUpdate_dict(self,_dict):
  2603. update_dict = {}
  2604. for k,v in _dict.items():
  2605. if v is None:
  2606. continue
  2607. if isinstance(v,str):
  2608. if v=="":
  2609. continue
  2610. if isinstance(v,(float,int)):
  2611. if v==0:
  2612. continue
  2613. update_dict[k] = v
  2614. return update_dict
  2615. def update_projects_by_document(self,docid,save,projects):
  2616. '''
  2617. 更新projects中对应的document的属性
  2618. :param docid:
  2619. :param projects: 项目集合
  2620. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2621. :return:
  2622. '''
  2623. list_docs = self.search_docs([docid])
  2624. docs = [_doc.getProperties() for _doc in list_docs]
  2625. project_dict = generate_common_properties(docs)
  2626. list_package_properties = generate_packages_properties(docs)
  2627. _dict = {}
  2628. #更新公共属性
  2629. _replace_replace = False
  2630. v = project_dict.get(document_district,"")
  2631. if not (v is None or v=="" or v=="[]" or v=="未知"):
  2632. _replace_replace = True
  2633. for k,v in project_dict.items():
  2634. if not _replace_replace:
  2635. if k in [document_district,document_city,document_province,document_area]:
  2636. continue
  2637. if v is None or v=="" or v=="[]" or v=="未知":
  2638. continue
  2639. if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
  2640. continue
  2641. _dict[k] = v
  2642. for _proj in projects:
  2643. _proj.update(_dict)
  2644. for _proj in projects:
  2645. if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
  2646. _proj[project_page_time] = project_dict.get(project_page_time,"")
  2647. #拼接属性
  2648. append_dict = {}
  2649. set_docid = set()
  2650. set_product = set()
  2651. set_code = set()
  2652. set_nlp_enterprise = set()
  2653. set_nlp_enterprise_attachment = set()
  2654. for _proj in projects:
  2655. _docids = _proj.get(project_docids,"")
  2656. _codes = _proj.get(project_project_codes,"")
  2657. _product = _proj.get(project_product,"")
  2658. set_docid = set(_docids.split(","))
  2659. if save==1:
  2660. set_docid.add(str(docid))
  2661. else:
  2662. if str(docid) in set_docid:
  2663. set_docid.remove(str(docid))
  2664. set_code = set_code | set(_codes.split(","))
  2665. set_product = set_product | set(_product.split(","))
  2666. try:
  2667. set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
  2668. set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
  2669. except Exception as e:
  2670. pass
  2671. set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
  2672. set_product = set_product | set(project_dict.get(project_product,"").split(","))
  2673. try:
  2674. set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
  2675. set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
  2676. except Exception as e:
  2677. pass
  2678. append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
  2679. append_dict[project_docid_number] = len(set_docid)
  2680. append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
  2681. append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
  2682. append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
  2683. append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
  2684. dict_dynamic = {}
  2685. set_docid = set()
  2686. _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
  2687. for _dy in _dynamic:
  2688. _docid = _dy.get("docid")
  2689. dict_dynamic[_docid] = _dy
  2690. _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
  2691. for _dy in _dynamic:
  2692. _docid = _dy.get("docid")
  2693. dict_dynamic[_docid] = _dy
  2694. list_dynamics = []
  2695. for k,v in dict_dynamic.items():
  2696. list_dynamics.append(v)
  2697. list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
  2698. append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
  2699. _proj.update(append_dict)
  2700. dict_package = {}
  2701. for _pp in projects:
  2702. _counts = 0
  2703. sub_project_name = _pp.get(project_sub_project_name,"")
  2704. if sub_project_name=="Project":
  2705. sub_project_name = ""
  2706. win_tenderer = _pp.get(project_win_tenderer,"")
  2707. win_bid_price = _pp.get(project_win_bid_price,0)
  2708. bidding_budget = _pp.get(project_bidding_budget,0)
  2709. if win_tenderer!="" and bidding_budget!=0:
  2710. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2711. dict_package[_key] = _pp
  2712. _counts += 1
  2713. if win_tenderer!="" and win_bid_price!=0:
  2714. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2715. dict_package[_key] = _pp
  2716. _counts +=1
  2717. if _counts==0:
  2718. if win_tenderer!="":
  2719. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2720. dict_package[_key] = _pp
  2721. _counts += 1
  2722. if bidding_budget!=0:
  2723. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2724. dict_package[_key] = _pp
  2725. _counts += 1
  2726. #更新私有属性
  2727. for _pp in list_package_properties:
  2728. flag_update = False
  2729. sub_project_name = _pp.get(project_sub_project_name,"")
  2730. if sub_project_name=="Project":
  2731. sub_project_name = ""
  2732. win_tenderer = _pp.get(project_win_tenderer,"")
  2733. win_bid_price = _pp.get(project_win_bid_price,0)
  2734. bidding_budget = _pp.get(project_bidding_budget,0)
  2735. if win_tenderer!="" and bidding_budget!=0:
  2736. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2737. if _key in dict_package:
  2738. if self.is_same_package(_pp,dict_package[_key]):
  2739. ud = self.getUpdate_dict(_pp)
  2740. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2741. dict_package[_key].update(ud)
  2742. flag_update = True
  2743. continue
  2744. if win_tenderer!="" and win_bid_price!=0:
  2745. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2746. if _key in dict_package:
  2747. if self.is_same_package(_pp,dict_package[_key]):
  2748. ud = self.getUpdate_dict(_pp)
  2749. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2750. dict_package[_key].update(ud)
  2751. flag_update = True
  2752. continue
  2753. if win_tenderer!="":
  2754. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2755. if _key in dict_package:
  2756. if self.is_same_package(_pp,dict_package[_key]):
  2757. ud = self.getUpdate_dict(_pp)
  2758. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2759. dict_package[_key].update(ud)
  2760. flag_update = True
  2761. continue
  2762. if bidding_budget!=0:
  2763. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2764. if _key in dict_package:
  2765. if self.is_same_package(_pp,dict_package[_key]):
  2766. ud = self.getUpdate_dict(_pp)
  2767. self.set_project_uuid(ud,dict_package[_key].get("uuid"))
  2768. dict_package[_key].update(ud)
  2769. flag_update = True
  2770. continue
  2771. if not flag_update:
  2772. _pp.update(project_dict)
  2773. projects.append(_pp)
  2774. _counts = 0
  2775. if win_tenderer!="" and bidding_budget!=0:
  2776. _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
  2777. dict_package[_key] = _pp
  2778. _counts += 1
  2779. if win_tenderer!="" and win_bid_price!=0:
  2780. _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
  2781. dict_package[_key] = _pp
  2782. _counts +=1
  2783. if _counts==0:
  2784. if win_tenderer!="":
  2785. _key = "%s-%s"%(sub_project_name,win_tenderer)
  2786. dict_package[_key] = _pp
  2787. _counts += 1
  2788. if bidding_budget!=0:
  2789. _key = "%s-%s"%(sub_project_name,str(bidding_budget))
  2790. dict_package[_key] = _pp
  2791. _counts += 1
  2792. def delete_projects_by_document(self,docid):
  2793. '''
  2794. 更新projects中对应的document的属性
  2795. :param docid:
  2796. :param projects: 项目集合
  2797. :param action:add/delete add时附加唯一属性,delete时删除唯一属性
  2798. :return:
  2799. '''
  2800. set_docid = set()
  2801. list_delete_projects = []
  2802. list_projects = self.search_projects_with_document([docid])
  2803. for _proj in list_projects:
  2804. _p = {}
  2805. _docids = _proj.get(project_docids,"")
  2806. print(_proj.get(project_uuid))
  2807. _p["delete_uuid"] = _proj.get(project_uuid)
  2808. _p["to_delete"] = True
  2809. list_delete_projects.append(_p)
  2810. if _docids!="":
  2811. set_docid = set_docid | set(_docids.split(","))
  2812. if str(docid) in set_docid:
  2813. set_docid.remove(str(docid))
  2814. list_docid = list(set_docid)
  2815. list_projects = []
  2816. if len(list_docid)>0:
  2817. list_docs = self.search_docs(list_docid)
  2818. list_projects = self.generate_projects_from_document(list_docs)
  2819. list_projects = dumplicate_projects(list_projects)
  2820. list_projects.extend(list_delete_projects)
  2821. project_json = to_project_json(list_projects)
  2822. print("delete_json",project_json)
  2823. return project_json
  2824. def delete_doc_handle(self,_dict,result_queue):
  2825. headers = _dict.get("frame")
  2826. conn = _dict.get("conn")
  2827. log("==========delete")
  2828. if headers is not None:
  2829. message_id = headers.headers["message-id"]
  2830. body = headers.body
  2831. item = json.loads(body)
  2832. docid = item.get("docid")
  2833. if docid is None:
  2834. return
  2835. delete_result = self.delete_projects_by_document(docid)
  2836. _uuid = uuid4().hex
  2837. _d = {PROJECT_PROCESS_UUID:_uuid,
  2838. PROJECT_PROCESS_CRTIME:1,
  2839. PROJECT_PROCESS_PROJECTS:delete_result}
  2840. _pp = Project_process(_d)
  2841. if _pp.update_row(self.ots_client):
  2842. ackMsg(conn,message_id)
  2843. #取消插入结果队列,改成插入project_process表
  2844. # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
  2845. # ackMsg(conn,message_id)
  2846. def generate_common_properties(self,list_docs):
  2847. '''
  2848. #通用属性生成
  2849. :param list_docis:
  2850. :return:
  2851. '''
  2852. #计数法选择
  2853. choose_dict = {}
  2854. project_dict = {}
  2855. for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
  2856. for _doc in list_docs:
  2857. _value = _doc.getProperties().get(_key,"")
  2858. if _value!="":
  2859. if _key not in choose_dict:
  2860. choose_dict[_key] = {}
  2861. if _value not in choose_dict[_key]:
  2862. choose_dict[_key][_value] = 0
  2863. choose_dict[_key][_value] += 1
  2864. _find = False
  2865. for _key in [document_district,document_city,document_province,document_area]:
  2866. area_dict = {}
  2867. for _doc in list_docs:
  2868. loc = _doc.getProperties().get(_key,"未知")
  2869. if loc not in ('全国','未知',"0"):
  2870. if loc not in area_dict:
  2871. area_dict[loc] = 0
  2872. area_dict[loc] += 1
  2873. list_loc = []
  2874. for k,v in area_dict.items():
  2875. list_loc.append([k,v])
  2876. list_loc.sort(key=lambda x:x[1],reverse=True)
  2877. if len(list_loc)>0:
  2878. project_dict[document_district] = _doc.getProperties().get(document_district)
  2879. project_dict[document_city] = _doc.getProperties().get(document_city)
  2880. project_dict[document_province] = _doc.getProperties().get(document_province)
  2881. project_dict[document_area] = _doc.getProperties().get(document_area)
  2882. _find = True
  2883. break
  2884. if not _find:
  2885. if len(list_docs)>0:
  2886. project_dict[document_district] = list_docs[0].getProperties().get(document_district)
  2887. project_dict[document_city] = list_docs[0].getProperties().get(document_city)
  2888. project_dict[document_province] = list_docs[0].getProperties().get(document_province)
  2889. project_dict[document_area] = list_docs[0].getProperties().get(document_area)
  2890. for _key,_value in choose_dict.items():
  2891. _l = []
  2892. for k,v in _value.items():
  2893. _l.append([k,v])
  2894. _l.sort(key=lambda x:x[1],reverse=True)
  2895. if len(_l)>0:
  2896. _v = _l[0][0]
  2897. if _v in ('全国','未知'):
  2898. if len(_l)>1:
  2899. _v = _l[1][0]
  2900. project_dict[_key] = _v
  2901. list_dynamics = []
  2902. docid_number = 0
  2903. visuable_docids = []
  2904. zhao_biao_page_time = ""
  2905. zhong_biao_page_time = ""
  2906. list_codes = []
  2907. list_product = []
  2908. p_page_time = ""
  2909. remove_docids = set()
  2910. for _doc in list_docs:
  2911. table_name = _doc.getProperties().get("table_name")
  2912. status = _doc.getProperties().get(document_status,0)
  2913. _save = _doc.getProperties().get(document_tmp_save,1)
  2914. doctitle = _doc.getProperties().get(document_doctitle,"")
  2915. docchannel = _doc.getProperties().get(document_docchannel)
  2916. page_time = _doc.getProperties().get(document_page_time,"")
  2917. _docid = _doc.getProperties().get(document_docid)
  2918. _bidway = _doc.getProperties().get(document_bidway,"")
  2919. _docchannel = _doc.getProperties().get(document_life_docchannel,0)
  2920. project_codes = _doc.getProperties().get(document_project_codes)
  2921. product = _doc.getProperties().get(document_product)
  2922. sub_docs = _doc.getProperties().get("sub_docs",[])
  2923. is_multipack = True if len(sub_docs)>1 else False
  2924. extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
  2925. if product is not None:
  2926. list_product.extend(product.split(","))
  2927. if project_codes is not None:
  2928. _c = project_codes.split(",")
  2929. list_codes.extend(_c)
  2930. if p_page_time=="":
  2931. p_page_time = page_time
  2932. if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
  2933. zhao_biao_page_time = page_time
  2934. if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
  2935. zhong_biao_page_time = page_time
  2936. is_visuable = 0
  2937. if table_name=="document":
  2938. if status>=201 and status<=300:
  2939. docid_number +=1
  2940. visuable_docids.append(str(_docid))
  2941. is_visuable = 1
  2942. else:
  2943. remove_docids.add(str(_docid))
  2944. else:
  2945. if _save==1:
  2946. docid_number +=1
  2947. visuable_docids.append(str(_docid))
  2948. is_visuable = 1
  2949. else:
  2950. remove_docids.add(str(_docid))
  2951. list_dynamics.append({document_docid:_docid,
  2952. document_doctitle:doctitle,
  2953. document_docchannel:_docchannel,
  2954. document_bidway:_bidway,
  2955. document_page_time:page_time,
  2956. document_status:201 if is_visuable==1 else 401,
  2957. "is_multipack":is_multipack,
  2958. document_tmp_extract_count:extract_count
  2959. }
  2960. )
  2961. project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
  2962. project_dict[project_docid_number] = docid_number
  2963. project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
  2964. if zhao_biao_page_time !="":
  2965. project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
  2966. if zhong_biao_page_time !="":
  2967. project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
  2968. project_dict[project_project_codes] = ",".join(list(set(list_codes)))
  2969. project_dict[project_page_time] = p_page_time
  2970. project_dict[project_product] = ",".join(list(set(list_product)))
  2971. return project_dict
  2972. def generate_packages_properties(self,list_docs):
  2973. '''
  2974. 生成分包属性
  2975. :param list_docs:
  2976. :return:
  2977. '''
  2978. list_properties = []
  2979. set_key = set()
  2980. for _doc in list_docs:
  2981. _dict = {}
  2982. sub_docs = _doc.getProperties().get("sub_docs")
  2983. if sub_docs is not None:
  2984. for _d in sub_docs:
  2985. sub_project_code = _d.get(project_sub_project_code,"")
  2986. sub_project_name = _d.get(project_sub_project_name,"")
  2987. win_tenderer = _d.get(project_win_tenderer,"")
  2988. win_bid_price = _d.get(project_win_bid_price,"")
  2989. _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
  2990. if _key in set_key:
  2991. continue
  2992. set_key.add(_key)
  2993. list_properties.append(_d)
  2994. return list_properties
  2995. def generate_projects_from_document(self,list_docs):
  2996. '''
  2997. #通过公告生成projects
  2998. :param list_docids:
  2999. :return:
  3000. '''
  3001. #判断标段数
  3002. list_projects = generate_projects([doc.getProperties() for doc in list_docs])
  3003. return list_projects
  3004. def search_projects_with_document(self,list_docids):
  3005. '''
  3006. 通过docid集合查询对应的projects
  3007. :param list_docids:
  3008. :return:
  3009. '''
  3010. print("==",list_docids)
  3011. list_should_q = []
  3012. for _docid in list_docids:
  3013. list_should_q.append(TermQuery("docids",_docid))
  3014. bool_query = BoolQuery(should_queries=list_should_q)
  3015. _query = {"query":bool_query,"limit":20}
  3016. list_project_dict = getDocument(_query,self.ots_client,[
  3017. project_uuid,project_docids,project_zhao_biao_page_time,
  3018. project_zhong_biao_page_time,
  3019. project_page_time,
  3020. project_area,
  3021. project_province,
  3022. project_city,
  3023. project_district,
  3024. project_info_type,
  3025. project_industry,
  3026. project_qcodes,
  3027. project_project_name,
  3028. project_project_code,
  3029. project_project_codes,
  3030. project_project_addr,
  3031. project_tenderee,
  3032. project_tenderee_addr,
  3033. project_tenderee_phone,
  3034. project_tenderee_contact,
  3035. project_agency,
  3036. project_agency_phone,
  3037. project_agency_contact,
  3038. project_sub_project_name,
  3039. project_sub_project_code,
  3040. project_bidding_budget,
  3041. project_win_tenderer,
  3042. project_win_bid_price,
  3043. project_win_tenderer_manager,
  3044. project_win_tenderer_phone,
  3045. project_second_tenderer,
  3046. project_second_bid_price,
  3047. project_second_tenderer_manager,
  3048. project_second_tenderer_phone,
  3049. project_third_tenderer,
  3050. project_third_bid_price,
  3051. project_third_tenderer_manager,
  3052. project_third_tenderer_phone,
  3053. project_procurement_system,
  3054. project_bidway,
  3055. project_dup_data,
  3056. project_docid_number,
  3057. project_project_dynamics,
  3058. project_product,
  3059. project_moneysource,
  3060. project_service_time,
  3061. project_time_bidclose,
  3062. project_time_bidopen,
  3063. project_time_bidstart,
  3064. project_time_commencement,
  3065. project_time_completion,
  3066. project_time_earnest_money_start,
  3067. project_time_earnest_money_end,
  3068. project_time_get_file_end,
  3069. project_time_get_file_start,
  3070. project_time_publicity_end,
  3071. project_time_publicity_start,
  3072. project_time_registration_end,
  3073. project_time_registration_start,
  3074. project_time_release,
  3075. project_dup_docid,
  3076. project_info_source,
  3077. project_nlp_enterprise,
  3078. project_nlp_enterprise_attachment,
  3079. ],sort="page_time",table_name="project2",table_index="project2_index")
  3080. return list_project_dict
  3081. def set_project_uuid(self,_dict,_uuid):
  3082. if _uuid is not None and _uuid!="":
  3083. if "uuid" in _dict:
  3084. _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
  3085. else:
  3086. _dict["uuid"] = _uuid
  3087. def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
  3088. whole_time_start = time.time()
  3089. _time = time.time()
  3090. list_query = []
  3091. list_code = [a for a in project_codes.split(",") if a!='']
  3092. should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
  3093. # print("should_q_code",[a for a in list_code[:20]])
  3094. should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
  3095. list_product = [a for a in product.split(",") if a!='']
  3096. should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
  3097. should_q_area = None
  3098. if province!="" or city!="" or district!="":
  3099. should_q = []
  3100. if province not in ("","全国","未知") and province is not None:
  3101. should_q.append(TermQuery(project_province,province))
  3102. if city not in ("","全国","未知") and city is not None:
  3103. should_q.append(TermQuery(project_city,city))
  3104. if district not in ("","全国","未知") and district is not None:
  3105. should_q.append(TermQuery(project_district,district))
  3106. if len(should_q)>0:
  3107. should_q_area = BoolQuery(should_queries=should_q)
  3108. prepare_time = time.time()-_time
  3109. _time = time.time()
  3110. # log("list_code %s"%(str(list_code)))
  3111. # log("list_product %s"%(str(list_product)))
  3112. # log("tenderee %s"%(tenderee))
  3113. # log("bidding_budget %s"%(bidding_budget))
  3114. # log("win_tenderer %s"%(win_tenderer))
  3115. # log("win_bid_price %s"%(win_bid_price))
  3116. # log("project_name %s"%(project_name))
  3117. log_time = time.time()-_time
  3118. _time = time.time()
  3119. if tenderee!="" and len(list_code)>0:
  3120. _query = [TermQuery(project_tenderee,tenderee),
  3121. should_q_code,
  3122. ]
  3123. list_query.append([_query,2])
  3124. _query = [TermQuery(project_tenderee,tenderee),
  3125. should_q_cod
  3126. ]
  3127. list_query.append([_query,2])
  3128. if tenderee!="" and len(list_product)>0:
  3129. _query = [TermQuery(project_tenderee,tenderee),
  3130. should_q_product]
  3131. list_query.append([_query,1])
  3132. if tenderee!="" and project_name!="":
  3133. _query = [TermQuery(project_tenderee,tenderee),
  3134. TermQuery(project_project_name,project_name)]
  3135. list_query.append([_query,2])
  3136. if tenderee!="" and agency!="":
  3137. _query = [TermQuery(project_tenderee,tenderee),
  3138. TermQuery(project_agency,agency)]
  3139. list_query.append([_query,0])
  3140. if tenderee!="" and float(bidding_budget)>0:
  3141. _query = [TermQuery(project_tenderee,tenderee),
  3142. TermQuery(project_bidding_budget,bidding_budget)]
  3143. list_query.append([_query,2])
  3144. if float(bidding_budget)>0 and float(win_bid_price)>0:
  3145. _query = [TermQuery(project_bidding_budget,bidding_budget),
  3146. TermQuery(project_win_bid_price,win_bid_price)]
  3147. list_query.append([_query,2])
  3148. if tenderee!="" and win_tenderer!="":
  3149. _query = [TermQuery(project_tenderee,tenderee),
  3150. TermQuery(project_win_tenderer,win_tenderer)]
  3151. list_query.append([_query,2])
  3152. if agency!="" and win_tenderer!="":
  3153. _query = [TermQuery(project_agency,agency),
  3154. TermQuery(project_win_tenderer,win_tenderer)]
  3155. list_query.append([_query,0])
  3156. if agency!="" and len(list_product)>0:
  3157. _query = [TermQuery(project_agency,agency),
  3158. should_q_product]
  3159. list_query.append([_query,1])
  3160. if win_tenderer!="" and len(list_code)>0:
  3161. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3162. should_q_code]
  3163. list_query.append([_query,2])
  3164. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3165. should_q_cod]
  3166. list_query.append([_query,2])
  3167. if win_tenderer!="" and float(win_bid_price)>0:
  3168. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3169. TermQuery(project_win_bid_price,win_bid_price)]
  3170. list_query.append([_query,2])
  3171. if win_tenderer!="" and float(bidding_budget)>0:
  3172. _query = [TermQuery(project_win_tenderer,win_tenderer),
  3173. TermQuery(project_bidding_budget,bidding_budget)]
  3174. list_query.append([_query,2])
  3175. if len(list_code)>0 and len(list_product)>0:
  3176. _query = [should_q_code,
  3177. should_q_product]
  3178. list_query.append([_query,2])
  3179. if len(list_code)>0:
  3180. _query = [
  3181. should_q_code]
  3182. list_query.append([_query,2])
  3183. _query = [
  3184. should_q_cod]
  3185. list_query.append([_query,1])
  3186. if project_name!="" and project_name is not None:
  3187. _query = [
  3188. TermQuery(project_project_name,project_name)]
  3189. list_query.append([_query,1])
  3190. _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
  3191. list_query.append([_query_title,1])
  3192. if len(list_product)>0 and should_q_area is not None:
  3193. _query = [should_q_area,
  3194. should_q_product]
  3195. list_query.append([_query,0])
  3196. generate_time = time.time()-_time
  3197. whole_time = time.time()-whole_time_start
  3198. # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
  3199. return list_query
  3200. def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],fix_columns=[project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source]):
  3201. '''
  3202. 对项目进行合并
  3203. :return:
  3204. '''
  3205. try:
  3206. whole_time_start = time.time()
  3207. set_uuid = set()
  3208. for _proj in list_projects:
  3209. _uuid = _proj.get("uuid")
  3210. if _uuid is not None:
  3211. set_uuid = set_uuid | set(_uuid.split(","))
  3212. must_not_q = []
  3213. for _uuid in list(set_uuid):
  3214. must_not_q.append(TermQuery("uuid",_uuid))
  3215. print("must_not_q uuid:%s"%(_uuid))
  3216. projects_merge_count = 0
  3217. projects_check_rule_time = 0
  3218. projects_update_time = 0
  3219. projects_query_time = 0
  3220. projects_prepare_time = 0
  3221. current_date = getCurrent_date("%Y-%m-%d")
  3222. min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
  3223. search_table = "project2"
  3224. search_table_index = "project2_index_formerge"
  3225. project_cls = Project
  3226. docids = ""
  3227. for _proj in list_projects[:30]:
  3228. docids = _proj.get(project_docids,"")
  3229. page_time = _proj.get(project_page_time,"")
  3230. project_codes = _proj.get(project_project_codes,"")
  3231. project_name = _proj.get(project_project_name,"")
  3232. tenderee = _proj.get(project_tenderee,"")
  3233. agency = _proj.get(project_agency,"")
  3234. product = _proj.get(project_product,"")
  3235. sub_project_name = _proj.get(project_sub_project_name,"")
  3236. bidding_budget = _proj.get(project_bidding_budget,-1)
  3237. win_tenderer = _proj.get(project_win_tenderer,"")
  3238. win_bid_price = _proj.get(project_win_bid_price,-1)
  3239. _dynamic = _proj.get(project_project_dynamics,"[]")
  3240. is_yanshou = False
  3241. list_dynamic = json.loads(_dynamic)
  3242. for _d in list_dynamic:
  3243. _title = _d.get("doctitle","")
  3244. if re.search("验收公[示告]",_title) is not None:
  3245. is_yanshou = True
  3246. break
  3247. province = _proj.get(project_province,"")
  3248. city = _proj.get(project_city,"")
  3249. district = _proj.get(project_district,"")
  3250. if is_yanshou:
  3251. page_time_less = timeAdd(page_time,-750)
  3252. page_time_greater = timeAdd(page_time,720)
  3253. else:
  3254. page_time_less = timeAdd(page_time,-450)
  3255. page_time_greater = timeAdd(page_time,420)
  3256. sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
  3257. _time = time.time()
  3258. list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
  3259. list_merge_data = []
  3260. search_table = "project2"
  3261. search_table_index = "project2_index_formerge"
  3262. project_cls = Project
  3263. # print("page_time,min_date",page_time,min_date)
  3264. # if page_time>=min_date:
  3265. # search_table = "project2_tmp"
  3266. # search_table_index = "project2_tmp_index"
  3267. # project_cls = Project_tmp
  3268. _step = 2
  3269. _begin = 0
  3270. must_queries = []
  3271. if page_time_less is not None and page_time_greater is not None:
  3272. must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
  3273. ]
  3274. #sub_project_name非必要条件
  3275. # if sub_project_q is not None:
  3276. # must_queries.append(sub_project_q)
  3277. projects_prepare_time += time.time()-_time
  3278. _time = time.time()
  3279. sort_type = SortOrder.DESC
  3280. while _begin<len(list_must_query):
  3281. if sort_type==SortOrder.DESC:
  3282. sort_type=SortOrder.ASC
  3283. if sort_type==SortOrder.ASC:
  3284. sort_type=SortOrder.DESC
  3285. list_should_q = []
  3286. _limit = 10
  3287. for must_q,_count in list_must_query[_begin:_begin+_step]:
  3288. must_q1 = list(must_q)
  3289. must_q1.extend(must_queries)
  3290. list_should_q.append(BoolQuery(must_queries=must_q1))
  3291. _limit += _count*5
  3292. _query = BoolQuery(
  3293. should_queries=list_should_q,
  3294. must_not_queries=must_not_q[:100]
  3295. )
  3296. # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
  3297. # SearchQuery(_query,limit=_limit),
  3298. # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
  3299. rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
  3300. SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
  3301. columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
  3302. list_data = getRow_ots(rows)
  3303. list_merge_data.extend(list_data)
  3304. # print(list_data)
  3305. for _data in list_data:
  3306. must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
  3307. _begin += _step
  3308. projects_query_time += time.time()-_time
  3309. #优先匹配招标金额相近的
  3310. projects_merge_count = len(list_merge_data)
  3311. list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
  3312. list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
  3313. # log(page_time_less+"=="+page_time_greater)
  3314. # log("list_merge_data:%s"%(str(list_merge_data)))
  3315. list_check_data = []
  3316. for _data in list_merge_data:
  3317. _time = time.time()
  3318. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3319. if b_log:
  3320. log(str(_check))
  3321. projects_check_rule_time += time.time()-_time
  3322. if _check:
  3323. list_check_data.append([_data,_prob])
  3324. list_check_data.sort(key=lambda x:x[1],reverse=True)
  3325. for _data,_ in list_check_data:
  3326. _time = time.time()
  3327. _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
  3328. projects_check_rule_time += time.time()-_time
  3329. _time = time.time()
  3330. if _check:
  3331. # o_proj = project_cls(_data)
  3332. # o_proj.fix_columns(self.ots_client,fix_columns,True)
  3333. # for k in fix_columns:
  3334. # _data[k] = o_proj.getProperties().get(k)
  3335. update_projects_by_project(_data,[_proj])
  3336. projects_update_time += time.time()-_time
  3337. whole_time = time.time()-whole_time_start
  3338. log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
  3339. return list_projects
  3340. except Exception as e:
  3341. traceback.print_exc()
  3342. assert 1==2
  3343. def merge_document_real(self,item,dup_docid,table_name,save,status_to=None,b_log=False):
  3344. '''
  3345. 实时项目合并
  3346. :param item:
  3347. :param dup_docid:重复的公告集合
  3348. :param status_to:
  3349. :return:
  3350. '''
  3351. try:
  3352. list_docids = []
  3353. _docid = item.get(document_tmp_docid)
  3354. list_docids.append(_docid)
  3355. if isinstance(dup_docid,list):
  3356. list_docids.extend(dup_docid)
  3357. list_docids = [a for a in list_docids if a is not None]
  3358. _time = time.time()
  3359. list_projects = self.search_projects_with_document(list_docids)
  3360. # log("search projects takes:%.3f"%(time.time()-_time))
  3361. if len(list_projects)==0:
  3362. # _time = time.time()
  3363. list_docs = self.search_docs(list_docids)
  3364. # log("search document takes:%.3f"%(time.time()-_time))
  3365. # _time = time.time()
  3366. list_projects = self.generate_projects_from_document(list_docs)
  3367. # log("generate projects takes:%.3f"%(time.time()-_time))
  3368. else:
  3369. _time = time.time()
  3370. self.update_projects_by_document(_docid,save,list_projects)
  3371. # log("update projects takes:%.3f"%(time.time()-_time))
  3372. _time = time.time()
  3373. list_projects = dumplicate_projects(list_projects)
  3374. # log("dumplicate projects takes:%.3f"%(time.time()-_time))
  3375. _time = time.time()
  3376. list_projects = self.merge_projects(list_projects,b_log)
  3377. # log("merge projects takes:%.3f"%(time.time()-_time))
  3378. _time = time.time()
  3379. list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
  3380. # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
  3381. _time = time.time()
  3382. project_json = to_project_json(list_projects)
  3383. # log("json projects takes:%.3f"%(time.time()-_time))
  3384. if b_log:
  3385. log("project_json:%s"%project_json)
  3386. return project_json,list_merge_dump
  3387. except Exception as e:
  3388. raise RuntimeError("error on dumplicate")
  3389. def is_exist_fingerprint(self,final_list,_docid,_fingerprint,table_name):
  3390. set_fingerprint = set()
  3391. for _i in range(1,len(final_list)):
  3392. _dict = final_list[_i]
  3393. b_docid = _dict[document_tmp_docid]
  3394. _save = _dict.get(document_tmp_save,0)
  3395. _status = _dict.get(document_tmp_status,0)
  3396. if table_name=="document":
  3397. if _status>=201 and _status<=300:
  3398. _save = 1
  3399. fingerprint_less = _dict.get(document_tmp_fingerprint,"")
  3400. if b_docid==_docid:
  3401. pass
  3402. else:
  3403. if _save==1:
  3404. set_fingerprint.add(fingerprint_less)
  3405. if _fingerprint in set_fingerprint:
  3406. return True
  3407. return False
  3408. def check_page_time(self,item):
  3409. page_time = item.get(document_page_time,"")
  3410. has_before = False
  3411. has_after = False
  3412. if len(page_time)>0:
  3413. l_page_time = timeAdd(page_time,days=-90)
  3414. dict_time = item.get("dict_time",{})
  3415. for k,v in dict_time.items():
  3416. if v is not None and len(v)>0:
  3417. if l_page_time>v:
  3418. has_before = True
  3419. if v>page_time:
  3420. has_after = True
  3421. if not has_after and has_before:
  3422. log("check page_time false %s==%s-%s"%(l_page_time,k,v))
  3423. return False
  3424. return True
  3425. def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
  3426. try:
  3427. start_time = time.time()
  3428. self.post_extract(item)
  3429. base_list = []
  3430. set_docid = set()
  3431. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=False)
  3432. # print("len_rules",len(list_rules),table_name,table_index)
  3433. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3434. _i = 0
  3435. step = 5
  3436. item["confidence"] = 999
  3437. if item.get(document_tmp_docid) not in set_docid:
  3438. base_list.append(item)
  3439. set_docid.add(item.get(document_tmp_docid))
  3440. while _i<len(list_rules):
  3441. must_not_q = []
  3442. if len(base_list)>0:
  3443. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3444. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3445. must_not_queries=must_not_q)
  3446. _rule = list_rules[_i]
  3447. confidence = _rule["confidence"]
  3448. singleNum_keys = _rule["singleNum_keys"]
  3449. contain_keys = _rule["contain_keys"]
  3450. multiNum_keys = _rule["multiNum_keys"]
  3451. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle])
  3452. _i += step
  3453. b_log = False if upgrade else True
  3454. _time = time.time()
  3455. # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3456. final_list = self.dumplicate_fianl_check(base_list,b_log)
  3457. exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
  3458. # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3459. best_docid = self.get_best_docid(final_list)
  3460. final_list_docid = [a["docid"] for a in final_list]
  3461. # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
  3462. _d = {"partitionkey":item["partitionkey"],
  3463. "docid":item["docid"],
  3464. "status":random.randint(*flow_dumplicate_status_to),
  3465. document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3466. }
  3467. dtmp = Document_tmp(_d)
  3468. dup_docid = set()
  3469. for _dict in final_list:
  3470. dup_docid.add(_dict.get(document_tmp_docid))
  3471. if item.get(document_tmp_docid) in dup_docid:
  3472. dup_docid.remove(item.get(document_tmp_docid))
  3473. remove_list = []
  3474. if self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid)):
  3475. dtmp.setValue(document_tmp_save,1,True)
  3476. # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
  3477. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3478. for _dict in final_list:
  3479. if _dict.get(document_tmp_docid) in dup_docid:
  3480. remove_list.append(_dict)
  3481. else:
  3482. dtmp.setValue(document_tmp_save,0,True)
  3483. if best_docid in dup_docid:
  3484. dup_docid.remove(best_docid)
  3485. for _dict in final_list:
  3486. if _dict.get(document_tmp_docid) in dup_docid:
  3487. remove_list.append(_dict)
  3488. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3489. dmp_docid = "%d,%s"%(best_docid,dmp_docid)
  3490. else:
  3491. dmp_docid = ",".join([str(a) for a in list(dup_docid)])
  3492. for _dict in final_list:
  3493. if _dict.get(document_tmp_docid) in dup_docid:
  3494. remove_list.append(_dict)
  3495. list_docids = list(dup_docid)
  3496. list_docids.append(best_docid)
  3497. if item.get(document_update_document)=="true":
  3498. dtmp.setValue(document_tmp_save,1,True)
  3499. list_merge_dump = []
  3500. if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
  3501. log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
  3502. dtmp.setValue(document_tmp_projects,"[]",True)
  3503. else:
  3504. project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
  3505. dtmp.setValue(document_tmp_projects,project_json,True)
  3506. log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
  3507. if upgrade:
  3508. # print(dtmp.getProperties())
  3509. dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
  3510. dtmp.setValue(document_tmp_best_docid,best_docid,True)
  3511. _flag = dtmp.update_row(self.ots_client)
  3512. if not _flag:
  3513. for i in range(10):
  3514. list_proj_json = dtmp.getProperties().get(document_tmp_projects)
  3515. if list_proj_json is not None:
  3516. list_proj = json.loads(list_proj_json)
  3517. dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
  3518. if dtmp.update_row(self.ots_client):
  3519. break
  3520. if table_name=="document_tmp":
  3521. self.changeSaveStatus(remove_list)
  3522. self.changeSaveStatus(list_merge_dump)
  3523. # log("dump takes %.2f"%(time.time()-start_time))
  3524. except Exception as e:
  3525. traceback.print_exc()
  3526. log("error on dumplicate of %s"%(str(item.get(document_tmp_docid))))
  3527. def fix_doc_which_not_in_project(self):
  3528. '''
  3529. 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
  3530. :return:
  3531. '''
  3532. def fix_doc_handle(item,result_queue):
  3533. _docid = item.get(document_tmp_docid)
  3534. b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
  3535. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  3536. SearchQuery(b_q,get_total_count=True),
  3537. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3538. if total_count==0:
  3539. log("fix_doc:%s not in project2"%(str(_docid)))
  3540. d_tmp = Document_tmp(item)
  3541. d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
  3542. d_tmp.update_row(self.ots_client)
  3543. if self.fix_doc_docid is None:
  3544. current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  3545. before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
  3546. bool_query = BoolQuery(must_queries=[
  3547. TermQuery(document_tmp_save,1),
  3548. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3549. RangeQuery(document_tmp_opertime,before_date)
  3550. ])
  3551. else:
  3552. bool_query = BoolQuery(must_queries=[
  3553. TermQuery(document_tmp_save,1),
  3554. RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
  3555. RangeQuery(document_tmp_docid,self.fix_doc_docid)
  3556. ])
  3557. list_data = []
  3558. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3559. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  3560. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3561. list_d = getRow_ots(rows)
  3562. list_data.extend(list_d)
  3563. while next_token:
  3564. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3565. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  3566. ColumnsToGet(return_type=ColumnReturnType.NONE))
  3567. list_d = getRow_ots(rows)
  3568. list_data.extend(list_d)
  3569. print("%d/%d"%(len(list_data),total_count))
  3570. if len(list_data)>0:
  3571. self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
  3572. log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
  3573. task_queue = Queue()
  3574. for _data in list_data:
  3575. task_queue.put(_data)
  3576. mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
  3577. mt.run()
  3578. def start_flow_dumplicate(self):
  3579. schedule = BlockingScheduler()
  3580. schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
  3581. schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
  3582. schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
  3583. schedule.add_job(self.flow_remove,"cron",hour="20")
  3584. schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
  3585. # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
  3586. schedule.start()
  3587. def changeSaveStatus(self,list_dict):
  3588. for _dict in list_dict:
  3589. if isinstance(_dict,dict):
  3590. if _dict.get(document_tmp_save,1)==1:
  3591. _d = {"partitionkey":_dict["partitionkey"],
  3592. "docid":_dict["docid"],
  3593. document_tmp_save:0
  3594. }
  3595. _d_tmp = Document_tmp(_d)
  3596. if _d_tmp.exists_row(self.ots_client):
  3597. _d_tmp.update_row(self.ots_client)
  3598. elif isinstance(_dict,int):
  3599. _d = {"partitionkey":_dict%500+1,
  3600. "docid":_dict,
  3601. document_tmp_save:0
  3602. }
  3603. _d_tmp = Document_tmp(_d)
  3604. if _d_tmp.fix_columns(self.ots_client,["status"],True):
  3605. if _d_tmp.getProperties().get("status")==1:
  3606. _d_tmp.setValue("status",0,True)
  3607. _d_tmp.update_row(self.ots_client)
  3608. def test_dumplicate(self,docid):
  3609. # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
  3610. columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
  3611. bool_query = BoolQuery(must_queries=[
  3612. TermQuery("docid",docid)
  3613. ])
  3614. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3615. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3616. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3617. log("flow_dumplicate producer total_count:%d"%total_count)
  3618. if total_count==0:
  3619. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  3620. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3621. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3622. list_dict = getRow_ots(rows)
  3623. for item in list_dict:
  3624. self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
  3625. return
  3626. def test_merge(self,list_docid_less,list_docid_greater):
  3627. list_docs_less = self.search_docs(list_docid_less)
  3628. list_projects_less = self.generate_projects_from_document(list_docs_less)
  3629. list_docs_greater = self.search_docs(list_docid_greater)
  3630. list_projects_greater = self.generate_projects_from_document(list_docs_greater)
  3631. list_projects_less.extend(list_projects_greater)
  3632. list_projects = dumplicate_projects(list_projects_less,b_log=True)
  3633. project_json = to_project_json(list_projects)
  3634. log("project_json:%s"%project_json)
  3635. return project_json
  3636. def getRemainDoc(self,docid):
  3637. columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
  3638. bool_query = BoolQuery(must_queries=[
  3639. TermQuery("docid",docid)
  3640. ])
  3641. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  3642. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  3643. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  3644. list_dict = getRow_ots(rows)
  3645. if len(list_dict)>0:
  3646. item = list_dict[0]
  3647. start_time = time.time()
  3648. self.post_extract(item)
  3649. base_list = []
  3650. set_docid = set()
  3651. list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
  3652. list_rules.sort(key=lambda x:x["confidence"],reverse=True)
  3653. _i = 0
  3654. step = 5
  3655. item["confidence"] = 999
  3656. if item.get(document_tmp_docid) not in set_docid:
  3657. base_list.append(item)
  3658. set_docid.add(item.get(document_tmp_docid))
  3659. while _i<len(list_rules):
  3660. must_not_q = []
  3661. if len(base_list)>0:
  3662. must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
  3663. _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
  3664. must_not_queries=must_not_q)
  3665. _rule = list_rules[_i]
  3666. confidence = _rule["confidence"]
  3667. singleNum_keys = _rule["singleNum_keys"]
  3668. contain_keys = _rule["contain_keys"]
  3669. multiNum_keys = _rule["multiNum_keys"]
  3670. self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
  3671. _i += step
  3672. _time = time.time()
  3673. log("%d start final check with length:%d"%(item["docid"],len(base_list)))
  3674. final_list = self.dumplicate_fianl_check(base_list)
  3675. log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
  3676. best_docid = self.get_best_docid(final_list)
  3677. return best_docid
  3678. return None
  3679. if __name__ == '__main__':
  3680. # df = Dataflow()
  3681. # df.flow_init()
  3682. # df.flow_test()
  3683. # df.test_merge()
  3684. # df.start_flow_attachment()
  3685. # df.start_flow_extract()
  3686. # df.start_flow_dumplicate()
  3687. # # df.start_flow_merge()
  3688. # df.start_flow_remove()
  3689. # download_attachment()
  3690. # test_attachment_interface()
  3691. df_dump = Dataflow_dumplicate(start_delete_listener=False)
  3692. # df_dump.start_flow_dumplicate()
  3693. a = time.time()
  3694. df_dump.test_dumplicate(386161571
  3695. )
  3696. # df_dump.test_merge([385521167
  3697. # ],[385521113])
  3698. # df_dump.flow_remove_project_tmp()
  3699. print("takes",time.time()-a)
  3700. # df_dump.fix_doc_which_not_in_project()
  3701. # df_dump.delete_projects_by_document(16288036)
  3702. # log("=======")
  3703. # for i in range(3):
  3704. # time.sleep(20)
  3705. #
  3706. # a = {"docid":74295123}
  3707. # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)