1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137 |
- # sys.path.append("/data")
- from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.common.multiProcess import MultiHandler
- from queue import Queue
- from multiprocessing import Queue as PQueue
- from BaseDataMaintenance.model.ots.document_tmp import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.document_html import *
- from BaseDataMaintenance.model.ots.document_extract2 import *
- from BaseDataMaintenance.model.ots.project import *
- from BaseDataMaintenance.model.ots.project2_tmp import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.model.ots.project_process import *
- import base64
- from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
- from uuid import uuid4
- from BaseDataMaintenance.common.ossUtils import *
- from BaseDataMaintenance.dataSource.source import is_internal,getAuth
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.maintenance.dataflow_settings import *
- from threading import Thread
- import oss2
- from BaseDataMaintenance.maxcompute.documentDumplicate import *
- from BaseDataMaintenance.maxcompute.documentMerge import *
- from BaseDataMaintenance.common.otsUtils import *
- from BaseDataMaintenance.common.activateMQUtils import *
- from BaseDataMaintenance.dataMonitor.data_monitor import BaseDataMonitor
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- def getSet(list_dict,key):
- _set = set()
- for item in list_dict:
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d[\d\.]*$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- return _set
- def getSimilarityOfString(str1,str2):
- _set1 = set()
- _set2 = set()
- if str1 is not None:
- for i in range(1,len(str1)):
- _set1.add(str1[i-1:i+1])
- if str2 is not None:
- for i in range(1,len(str2)):
- _set2.add(str2[i-1:i+1])
- _len = max(1,min(len(_set1),len(_set2)))
- return len(_set1&_set2)/_len
- def getDiffIndex(list_dict,key,confidence=100):
- _set = set()
- for _i in range(len(list_dict)):
- item = list_dict[_i]
- if item["confidence"]>=confidence:
- continue
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d+(\.\d+)?$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- if len(_set)>1:
- return _i
- return len(list_dict)
- def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
- swf_urls = []
- try:
- list_files = os.listdir(swf_dir)
- list_files.sort(key=lambda x:x)
- headers = dict()
- headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
- for _file in list_files:
- swf_localpath = "%s/%s"%(swf_dir,_file)
- swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
- uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
- _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
- swf_urls.append(_url)
- os.remove(swf_localpath)
- except Exception as e:
- traceback.print_exc()
- return swf_urls
- class Dataflow():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.queue_init = Queue()
- self.queue_attachment = Queue()
- self.queue_attachment_ocr = Queue()
- self.queue_attachment_not_ocr = Queue()
- self.list_attachment_ocr = []
- self.list_attachment_not_ocr = []
- self.queue_extract = Queue()
- self.list_extract = []
- self.queue_dumplicate = PQueue()
- self.queue_dumplicate_processed = PQueue()
- self.dumplicate_set = set()
- self.queue_merge = Queue()
- self.queue_syncho = Queue()
- self.queue_remove = Queue()
- self.queue_remove_project = Queue()
- self.attachment_rec_interface = ""
- self.ots_client_merge = getConnect_ots()
- if is_internal:
- self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- if is_internal:
- self.extract_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
- self.industy_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
- self.other_url = "http://1255640119316927.vpc.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
- else:
- self.extract_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/content_extract"
- self.industy_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/industry_extract"
- self.other_url = "http://1255640119316927.cn-hangzhou.pai-eas.aliyuncs.com/api/predict/other_extract"
- self.header = {'Content-Type': 'application/json',"Authorization":"NzZmOWZlMmU2MGY3YmQ4MDBjM2E5MDAyZjhjNjQ0MzZlMmE0NTMwZg=="}
- self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- self.auth = getAuth()
- oss2.defaults.connection_pool_size = 100
- oss2.defaults.multiget_num_threads = 20
- log("bucket_url:%s"%(self.bucket_url))
- self.attachment_bucket_name = "attachment-hub"
- self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
- self.current_path = os.path.dirname(__file__)
- def flow_init(self):
- def producer():
- bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- log("flow_init producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- if document_tmp_dochtmlcon in item:
- item.pop(document_tmp_dochtmlcon)
- if document_tmp_doctextcon in item:
- item.pop(document_tmp_doctextcon)
- if document_tmp_attachmenttextcon in item:
- item.pop(document_tmp_attachmenttextcon)
- _status = item.get(document_tmp_status)
- new_status = None
- if _status>=201 and _status<=300:
- item[document_tmp_save] = 1
- new_status = 81
- elif _status>=401 and _status<=450:
- item[document_tmp_save] = 0
- new_status = 81
- else:
- new_status = 1
- # new_status = 1
- item[document_tmp_status] = new_status
- dtmp = Document_tmp(item)
- dhtml = Document_html({document_tmp_partitionkey:item.get(document_tmp_partitionkey),
- document_tmp_docid:item.get(document_tmp_docid),
- document_tmp_dochtmlcon:_dochtmlcon})
- dtmp.update_row(ots_client)
- dhtml.update_row(ots_client)
- producer()
- comsumer()
- def getTitleFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"data":filemd5})
- _title = ""
- if _find is not None:
- _title = _find.get_text()
- return _title
- def getSourceLinkFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"filelink":filemd5})
- filelink = ""
- if _find is None:
- _find = _soup.find("img",attrs={"filelink":filemd5})
- if _find is not None:
- filelink = _find.attrs.get("src","")
- else:
- filelink = _find.attrs.get("href","")
- return filelink
- def request_attachment_interface(self,attach,_dochtmlcon):
- filemd5 = attach.getProperties().get(attachment_filemd5)
- _status = attach.getProperties().get(attachment_status)
- _filetype = attach.getProperties().get(attachment_filetype)
- _size = attach.getProperties().get(attachment_size)
- _path = attach.getProperties().get(attachment_path)
- _uuid = uuid4()
- objectPath = attach.getProperties().get(attachment_path)
- localpath = os.path.join(self.current_path,"download",_uuid.hex)
- docids = attach.getProperties().get(attachment_docids)
- try:
- if _size>ATTACHMENT_LARGESIZE:
- attach.setValue(attachment_status, ATTACHMENT_TOOLARGE)
- log("attachment :%s of path:%s to large"%(filemd5,_path))
- attach.update_row(self.ots_client)
- return True
- else:
- d_start_time = time.time()
- if downloadFile(self.bucket,objectPath,localpath):
- time_download = time.time()-d_start_time
- _data_base64 = base64.b64encode(open(localpath,"rb").read())
- #调用接口处理结果
- start_time = time.time()
- _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype,kwargs={"timeout":600})
- if _success:
- log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
- else:
- log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
- # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
- _html = ""
- return False
- swf_images = eval(swf_images)
- if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
- swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
- if len(swf_urls)==0:
- objectPath = attach.getProperties().get(attachment_path,"")
- localpath = os.path.join(self.current_path,"download/%s.swf"%(uuid4().hex))
- swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
- if not os.path.exists(swf_dir):
- os.mkdir(swf_dir)
- for _i in range(len(swf_images)):
- _base = swf_images[_i]
- _base = base64.b64decode(_base)
- filename = "swf_page_%d.png"%(_i)
- filepath = os.path.join(swf_dir,filename)
- with open(filepath,"wb") as f:
- f.write(_base)
- swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
- if os.path.exists(swf_dir):
- os.rmdir(swf_dir)
- attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
- if re.search("<td",_html) is not None:
- attach.setValue(attachment_has_table,1,True)
- _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
- filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
- if _file_title!="":
- attach.setValue(attachment_file_title,_file_title,True)
- if filelink!="":
- attach.setValue(attachment_file_link,filelink,True)
- attach.setValue(attachment_attachmenthtml,_html,True)
- attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
- attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
- attach.setValue(attachment_recsize,len(_html),True)
- attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- attach.update_row(self.ots_client) #线上再开放更新
- return True
- else:
- return False
- except oss2.exceptions.NotFound as e:
- return True
- except Exception as e:
- traceback.print_exc()
- finally:
- try:
- os.remove(localpath)
- except:
- pass
- def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
- list_html = []
- swf_urls = []
- for _attach in list_attach:
- #测试全跑
- if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append(_html)
- else:
- _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
- if not _succeed:
- return False,"",[]
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append(_html)
- if _attach.getProperties().get(attachment_filetype)=="swf":
- swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
- return True,list_html,swf_urls
- def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
- set_term=set(["doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
- set_range=set(["page_time","status"]),set_phrase=set(["doctitle","project_name"])):
- list_must_queries = []
- list_must_no_queries = []
- for k,v in _dict.items():
- if k in set_match:
- if isinstance(v,str):
- l_s = []
- for s_v in v.split(","):
- l_s.append(MatchQuery(k,s_v))
- list_must_queries.append(BoolQuery(should_queries=l_s))
- elif k in set_nested:
- _v = v
- if k!="":
- if k=="bidding_budget" or k=="win_bid_price":
- _v = float(_v)
- list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- else:
- list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- elif k in set_term:
- list_must_queries.append(TermQuery(k,v))
- elif k in set_phrase:
- list_must_queries.append(MatchPhraseQuery(k,v))
- elif k in set_range:
- if len(v)==1:
- list_must_queries.append(RangeQuery(k,v[0]))
- elif len(v)==2:
- list_must_queries.append(RangeQuery(k,v[0],v[1],True,True))
- for k,v in _dict_must_not.items():
- if k in set_match:
- if isinstance(v,str):
- l_s = []
- for s_v in v.split(","):
- l_s.append(MatchQuery(k,s_v))
- list_must_no_queries.append(BoolQuery(should_queries=l_s))
- elif k in set_nested:
- _v = v
- if k!="":
- if k=="bidding_budget" or k=="win_bid_price":
- _v = float(_v)
- list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- else:
- list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
- elif k in set_term:
- list_must_no_queries.append(TermQuery(k,v))
- elif k in set_range:
- if len(v)==1:
- list_must_no_queries.append(RangeQuery(k,v[0]))
- elif len(v)==2:
- list_must_no_queries.append(RangeQuery(k,v[0],v[1],True,True))
- return BoolQuery(must_queries=list_must_queries,must_not_queries=list_must_no_queries)
- def f_decode_sub_docs_json(self, project_code,project_name,tenderee,agency,sub_docs_json):
- columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
- extract_count = 0
- if project_code is not None and project_code!="":
- extract_count += 1
- if project_name is not None and project_name!="":
- extract_count += 1
- if tenderee is not None and tenderee!="":
- extract_count += 1
- if agency is not None and agency!="":
- extract_count += 1
- if sub_docs_json is not None:
- try:
- sub_docs = json.loads(sub_docs_json)
- except Exception as e:
- sub_docs = []
- sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
- sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
- # log("==%s"%(str(sub_docs)))
- for sub_docs in sub_docs:
- for _key_sub_docs in sub_docs.keys():
- extract_count += 1
- if _key_sub_docs in columns:
- if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
- if _key_sub_docs in ["bidding_budget","win_bid_price"]:
- if float(sub_docs[_key_sub_docs])>0:
- columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
- else:
- columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
- return columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count
- def post_extract(self,_dict):
- win_tenderer,bidding_budget,win_bid_price,extract_count = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
- _dict["win_tenderer"] = win_tenderer
- _dict["bidding_budget"] = bidding_budget
- _dict["win_bid_price"] = win_bid_price
- if "extract_count" not in _dict:
- _dict["extract_count"] = extract_count
- def get_dump_columns(self,_dict):
- docchannel = _dict.get(document_tmp_docchannel,0)
- project_code = _dict.get(document_tmp_project_code,"")
- project_name = _dict.get(document_tmp_project_name,"")
- tenderee = _dict.get(document_tmp_tenderee,"")
- agency = _dict.get(document_tmp_agency,"")
- doctitle_refine = _dict.get(document_tmp_doctitle_refine,"")
- win_tenderer = _dict.get("win_tenderer","")
- bidding_budget = _dict.get("bidding_budget","")
- if bidding_budget==0:
- bidding_budget = ""
- win_bid_price = _dict.get("win_bid_price","")
- if win_bid_price==0:
- win_bid_price = ""
- page_time = _dict.get(document_tmp_page_time,"")
- fingerprint = _dict.get(document_tmp_fingerprint,"")
- product = _dict.get(document_tmp_product,"")
- return docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product
- def f_set_docid_limitNum_contain(self,item, _split,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"]):
- flag = True
- for _key in singleNum_keys:
- if len(getSet(_split,_key))>1:
- flag = False
- break
- for _key in multiNum_keys:
- if len(getSet(_split,_key))<=1:
- flag = False
- break
- project_code = item.get("project_code","")
- for _key in notlike_keys:
- if not flag:
- break
- for _d in _split:
- _key_v = _d.get(_key,"")
- _sim = getSimilarityOfString(project_code,_key_v)
- if _sim>0.7 and _sim<1:
- flag = False
- break
- #判断组内每条公告是否包含
- if flag:
- if len(contain_keys)>0:
- for _key in contain_keys:
- MAX_CONTAIN_COLUMN = None
- for _d in _split:
- contain_column = _d.get(_key)
- if contain_column is not None and contain_column !="":
- if MAX_CONTAIN_COLUMN is None:
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if len(MAX_CONTAIN_COLUMN)<len(contain_column):
- if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
- flag = False
- break
- if flag:
- return _split
- return []
- def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count,document_tmp_doctitle]):
- list_data = []
- if isinstance(_query,list):
- bool_query = BoolQuery(should_queries=_query)
- else:
- bool_query = _query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=50,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.post_extract(_dict)
- _dict["confidence"] = confidence
- list_data.append(_dict)
- # _count = len(list_dict)
- # while next_token:
- # rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- # list_dict = getRow_ots(rows)
- # for _dict in list_dict:
- # self.post_extract(_dict)
- # _dict["confidence"] = confidence
- # list_data.append(_dict)
- list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
- return list_dict
- def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
- list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
- for _dict in list_dict:
- self.post_extract(_dict)
- _docid = _dict.get(document_tmp_docid)
- if _docid not in set_docid:
- base_list.append(_dict)
- set_docid.add(_docid)
- def translate_dumplicate_rules(self,status_from,item):
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- if page_time=='':
- page_time = getCurrent_date("%Y-%m-%d")
- base_dict = {
- "status":[status_from[0]],
- "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
- }
- must_not_dict = {"save":0}
- list_rules = []
- singleNum_keys = ["tenderee","win_tenderer"]
- if fingerprint!="":
- _dict = {}
- confidence = 100
- _dict[document_tmp_fingerprint] = fingerprint
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "item":item,
- "query":_query,
- "singleNum_keys":[],
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if docchannel in (52,118):
- if bidding_budget!="" and tenderee!="" and project_code!="":
- confidence = 90
- _dict = {document_tmp_docchannel:docchannel,
- "bidding_budget":item.get("bidding_budget"),
- document_tmp_tenderee:item.get(document_tmp_tenderee,""),
- document_tmp_project_code:item.get(document_tmp_project_code,"")
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if doctitle_refine!="" and tenderee!="" and bidding_budget!="":
- confidence = 80
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- bidding_budget:"bidding_budget"
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and agency!="" and bidding_budget!="":
- confidence = 90
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and tenderee!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if doctitle_refine!="" and agency!="" and bidding_budget!="":
- confidence = 71
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and project_name!="" and agency!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "agency":agency,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- ##-- 5. 招标公告 - 同项目编号- 同[项目名称、标题] - 同[招标人、代理公司] - 同预算(!=0) - 同信息源=1
- if project_code!="" and project_name!="" and tenderee!="" and bidding_budget!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and tenderee!="" and bidding_budget!="":
- confidence = 71
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
- if project_name!="" and agency!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "agency":agency,
- "bidding_budget":tmp_bidding
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 4. 招标公告 - 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 信息源>1
- if project_code!="" and agency!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "agency":agency,
- "bidding_budget":tmp_bidding
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel not in (101,119,120):
- #-- 7. 非中标公告 - 同项目名称 - 同发布日期 - 同招标人 - 同预算 - 同类型 - 信息源>1 - 同项目编号
- if project_name!="" and tenderee!="" and project_code!="":
- tmp_bidding = 0
- if bidding_budget!="":
- tmp_bidding = bidding_budget
- confidence = 51
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "tenderee":tenderee,
- "project_code":project_code
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel in (101,119,120):
- #-- 3. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(==0)
- if project_code!="" and project_name!="" and win_tenderer!="":
- tmp_win = 0
- if win_bid_price!="":
- tmp_win = win_bid_price
- confidence = 61
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":tmp_win
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!="" and project_name!="" and bidding_budget!="" and product!="":
- confidence = 72
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "bidding_budget":bidding_budget,
- "product":product
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_code!='' and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- ##-- 2. 中标公告 - 同项目编号- 同[项目名称、标题] - 同中标人 - 同中标价(!=0) - 同信息源=1
- if project_code!="" and project_name!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if project_name!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_code!="" and doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence = 91
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- n_singleKeys = [i for i in singleNum_keys]
- n_singleKeys.append(document_tmp_web_source_no)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":n_singleKeys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if doctitle_refine!="" and win_tenderer!="" and win_bid_price!="":
- confidence=90
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_name!="" and win_tenderer!="" and win_bid_price!="" and project_code!="":
- confidence=95
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- "project_code":project_code
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if docchannel in (51,103,115,116):
- #9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
- if doctitle_refine!="" and tenderee!="":
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- confidence=81
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #-- 9.同['公告变更','拍卖出让','土地矿产','招标答疑']- 同[标题 、项目编号、项目名称]- 同[招标人、代理公司] - 同预算 - 同一天 - 不同数据源
- if project_code!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "project_code":project_code,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if project_name!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "project_name":project_name,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and tenderee!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "tenderee":tenderee,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and project_code!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "project_code":project_code,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- if agency!="" and project_name!="":
- confidence=81
- tmp_budget = 0
- if bidding_budget!="":
- tmp_budget = bidding_budget
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "project_name":project_name,
- "bidding_budget":tmp_budget,
- "product":product
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[document_tmp_web_source_no]}
- list_rules.append(_rule)
- #五选二
- if tenderee!="" and bidding_budget!="" and product!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and win_tenderer!="" and product!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and win_bid_price!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "win_bid_price":win_bid_price,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if tenderee!="" and agency!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "tenderee":tenderee,
- "agency":agency,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_bid_price!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if agency!="" and bidding_budget!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "agency":agency,
- "bidding_budget":bidding_budget,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and win_bid_price!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if win_tenderer!="" and agency!="":
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "win_tenderer":win_tenderer,
- "agency":agency,
- "product":product,
- }
- _dict.update(base_dict)
- _dict["page_time"] = [page_time,page_time]
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- if doctitle_refine!="" and product!="" and len(doctitle_refine)>7:
- confidence=80
- _dict = {document_tmp_docchannel:docchannel,
- "doctitle_refine":doctitle_refine,
- "product":product,
- }
- _dict.update(base_dict)
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "query":_query,
- "singleNum_keys":singleNum_keys,
- "contain_keys":[],
- "multiNum_keys":[]}
- list_rules.append(_rule)
- return list_rules
- def dumplicate_fianl_check(self,base_list):
- the_group = base_list
- the_group.sort(key=lambda x:x["confidence"],reverse=True)
- if len(the_group)>10:
- keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
- else:
- keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
- #置信度
- list_key_index = []
- for _k in keys:
- if _k=="doctitle":
- list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
- else:
- list_key_index.append(getDiffIndex(the_group,_k))
- _index = min(list_key_index)
- if _index>1:
- return the_group[:_index]
- return []
- def get_best_docid(self,base_list):
- to_reverse = False
- dict_source_count = {}
- for _item in base_list:
- _web_source = _item.get(document_tmp_web_source_no)
- _fingerprint = _item.get(document_tmp_fingerprint)
- if _web_source is not None:
- if _web_source not in dict_source_count:
- dict_source_count[_web_source] = set()
- dict_source_count[_web_source].add(_fingerprint)
- if len(dict_source_count[_web_source])>=2:
- to_reverse=True
- if len(base_list)>0:
- base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
- base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
- base_list.sort(key=lambda x:x["extract_count"],reverse=True)
- return base_list[0]["docid"]
- def save_dumplicate(self,base_list,best_docid,status_from,status_to):
- #best_docid need check while others can save directly
- list_dict = []
- for item in base_list:
- docid = item["docid"]
- _dict = {"partitionkey":item["partitionkey"],
- "docid":item["docid"]}
- if docid==best_docid:
- if item.get("save",1)!=0:
- _dict["save"] = 1
- else:
- _dict["save"] = 0
- if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
- _dict["status"] = random.randint(status_to[0],status_to[1])
- list_dict.append(_dict)
- for _dict in list_dict:
- dtmp = Document_tmp(_dict)
- dtmp.update_row(self.ots_client)
- def flow_test(self,status_to=[1,10]):
- def producer():
- bool_query = BoolQuery(must_queries=[
- # ExistsQuery("docid"),
- # RangeQuery("crtime",range_to='2022-04-10'),
- # RangeQuery("status",61),
- NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
- ],
- must_not_queries=[
- # NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*")),
- TermQuery("attachment_extract_status",1),
- RangeQuery("status",1,11)
- ]
- )
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- log("flow_init producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count = len(list_dict)
- while next_token and _count<1000000:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_init.put(_dict)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- def comsumer():
- mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- # print(item)
- dtmp = Document_tmp(item)
- dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
- dtmp.update_row(ots_client)
- # dhtml = Document_html(item)
- # dhtml.update_row(ots_client)
- # dtmp.delete_row(ots_client)
- # dhtml.delete_row(ots_client)
- producer()
- comsumer()
- def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
- def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_web_source_name]):
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_dumplicate.put(_dict)
- _count = len(list_dict)
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_dumplicate.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_dumplicate,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- self.post_extract(item)
- base_list = []
- set_docid = set()
- list_rules = self.translate_dumplicate_rules(flow_dumplicate_status_from,item)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- # print(item,"len_rules",len(list_rules))
- for _rule in list_rules:
- _query = _rule["query"]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys)
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- final_list = self.dumplicate_fianl_check(base_list)
- best_docid = self.get_best_docid(final_list)
- # log(str(final_list))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
- dtmp.setValue(document_tmp_save,1,True)
- dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- dmp_docid = "%d,%s"%(best_docid,dmp_docid)
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
- dtmp.update_row(self.ots_client)
- #只保留当前公告
- # self.save_dumplicate(final_list,best_docid,status_from,status_to)
- #
- # print("=base=",item)
- # if len(final_list)>=1:
- # print("==================")
- # for _dict in final_list:
- # print(_dict)
- # print("========>>>>>>>>>>")
- producer()
- comsumer()
- def merge_document(self,item,status_to=None):
- self.post_extract(item)
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- }
- dtmp = Document_tmp(_d)
- if item.get(document_tmp_save,1)==1:
- list_should_q = []
- if project_code!="" and tenderee!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("tenderee",tenderee)])
- list_should_q.append(_q)
- if project_name!="" and project_code!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if len(list_should_q)>0:
- list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
- if len(list_data)==1:
- dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
- print(item["docid"],list_data[0]["uuid"])
- else:
- list_should_q = []
- if bidding_budget!="" and project_code!="":
- _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
- TermQuery("bidding_budget",float(bidding_budget))])
- list_should_q.append(_q)
- if tenderee!="" and bidding_budget!="" and project_name!="":
- _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
- TermQuery("bidding_budget",float(bidding_budget)),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if tenderee!="" and win_bid_price!="" and project_name!="":
- _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
- TermQuery("win_bid_price",float(win_bid_price)),
- TermQuery("project_name",project_name)])
- list_should_q.append(_q)
- if len(list_should_q)>0:
- list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
- if len(list_data)==1:
- dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
- print(item["docid"],list_data[0]["uuid"])
- return dtmp.getProperties().get("merge_uuid","")
- # dtmp.update_row(self.ots_client)
- def test_merge(self):
- import pandas as pd
- import queue
- def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
- list_test_item = []
- should_q = BoolQuery(should_queries=[
- TermQuery("docchannel",101),
- TermQuery("docchannel",119),
- TermQuery("docchannel",120)
- ])
- bool_query = BoolQuery(must_queries=[
- TermQuery("page_time","2022-04-22"),
- should_q,
- TermQuery("save",1)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- list_test_item.append(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- list_test_item.append(_dict)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- return list_test_item
- from BaseDataMaintenance.model.ots.project import Project
- def comsumer_handle(item,result_queue,ots_client):
- item["merge_uuid"] = self.merge_document(item)
- if item["merge_uuid"]!="":
- _dict = {"uuid":item["merge_uuid"]}
- _p = Project(_dict)
- _p.fix_columns(self.ots_client,["zhao_biao_page_time"],True)
- if _p.getProperties().get("zhao_biao_page_time","")!="":
- item["是否有招标"] = "是"
- list_test_item = producer()
- task_queue = queue.Queue()
- for item in list_test_item:
- task_queue.put(item)
- mt = MultiThreadHandler(task_queue,comsumer_handle,None,30,1,ots_client=self.ots_client)
- mt.run()
- keys = [document_tmp_docid,document_tmp_docchannel,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_doctitle_refine,"win_tenderer","bidding_budget","win_bid_price","merge_uuid","是否有招标"]
- df_data = {}
- for k in keys:
- df_data[k] = []
- for item in list_test_item:
- for k in keys:
- df_data[k].append(item.get(k,""))
- df = pd.DataFrame(df_data)
- df.to_excel("test_merge.xlsx",columns=keys)
- def flow_merge(self,process_count=10000,status_from=[71,80],status_to=[81,90]):
- def producer(columns=[document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_merge producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_merge.put(_dict)
- _count = len(list_dict)
- while next_token and _count<process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_merge.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_merge,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- self.merge_document(item,status_to)
- # producer()
- # comsumer()
- pass
- def flow_syncho(self,status_from=[71,80],status_to=[81,90]):
- pass
- def flow_remove(self,process_count=flow_process_count,status_from=flow_remove_status_from):
- def producer():
- current_date = getCurrent_date("%Y-%m-%d")
- tmp_date = timeAdd(current_date,-10)
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
- RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- log("flow_remove producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_remove,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- dtmp = Document_tmp(item)
- dtmp.delete_row(self.ots_client)
- dhtml = Document_html(item)
- dhtml.delete_row(self.ots_client)
- producer()
- comsumer()
- def start_flow_dumplicate(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_remove,"cron",hour="20")
- schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
- schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
- schedule.start()
- def flow_remove_project_tmp(self,process_count=flow_process_count):
- def producer():
- current_date = getCurrent_date("%Y-%m-%d")
- tmp_date = timeAdd(current_date,-6*31)
- bool_query = BoolQuery(must_queries=[
- RangeQuery(project_page_time,range_to="%s"%(tmp_date))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2_tmp","project2_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- log("flow_remove project2_tmp producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove_project.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- self.queue_remove_project.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(self.queue_remove_project,comsumer_handle,None,10,1,ots_client=self.ots_client)
- mt.run()
- def comsumer_handle(item,result_queue,ots_client):
- ptmp = Project_tmp(item)
- ptmp.delete_row(self.ots_client)
- producer()
- comsumer()
- def start_flow_merge(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_merge,"cron",second="*/10")
- schedule.start()
- def download_attachment():
- ots_client = getConnect_ots()
- queue_attachment = Queue()
- auth = getAuth()
- oss2.defaults.connection_pool_size = 100
- oss2.defaults.multiget_num_threads = 20
- attachment_bucket_name = "attachment-hub"
- if is_internal:
- bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
- current_path = os.path.dirname(__file__)
- def producer():
- columns = [document_tmp_attachment_path]
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_crtime,"2022-03-29 15:00:00","2022-03-29 17:00:00",True,True)])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_attachment producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- queue_attachment.put(_dict)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- queue_attachment.put(_dict)
- _count += len(list_dict)
- def comsumer():
- mt = MultiThreadHandler(queue_attachment,comsumer_handle,None,10,1)
- mt.run()
- def getAttachments(list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
- list_attachment = []
- rows_to_get = []
- for _md5 in list_filemd5[:50]:
- if _md5 is None:
- continue
- primary_key = [(attachment_filemd5,_md5)]
- rows_to_get.append(primary_key)
- req = BatchGetRowRequest()
- req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
- try:
- result = ots_client.batch_get_row(req)
- attach_result = result.get_result_by_table(attachment_table_name)
- for item in attach_result:
- if item.is_ok:
- _dict = getRow_ots_primary(item.row)
- if _dict is not None:
- list_attachment.append(attachment(_dict))
- except Exception as e:
- log(str(list_filemd5))
- log("attachProcess comsumer error %s"%str(e))
- return list_attachment
- def comsumer_handle(item,result_queue):
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- if len(page_attachments)==0:
- pass
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = getAttachments(list_fileMd5)
- for attach in list_attach:
- filemd5 = attach.getProperties().get(attachment_filemd5)
- _status = attach.getProperties().get(attachment_status)
- _filetype = attach.getProperties().get(attachment_filetype)
- _size = attach.getProperties().get(attachment_size)
- _path = attach.getProperties().get(attachment_path)
- _uuid = uuid4()
- objectPath = attach.getProperties().get(attachment_path)
- localpath = os.path.join(current_path,"download","%s.%s"%(filemd5,_filetype))
- try:
- if _size>ATTACHMENT_LARGESIZE:
- pass
- else:
- downloadFile(bucket,objectPath,localpath)
- except Exception as e:
- traceback.print_exc()
- producer()
- comsumer()
- def test_attachment_interface():
- current_path = os.path.dirname(__file__)
- task_queue = Queue()
- def producer():
- _count = 0
- list_filename = os.listdir(os.path.join(current_path,"download"))
- for _filename in list_filename:
- _count += 1
- _type = _filename.split(".")[1]
- task_queue.put({"path":os.path.join(current_path,"download",_filename),"file_type":_type})
- if _count>=500:
- break
- def comsumer():
- mt = MultiThreadHandler(task_queue,comsumer_handle,None,10)
- mt.run()
- def comsumer_handle(item,result_queue):
- _path = item.get("path")
- _type = item.get("file_type")
- _data_base64 = base64.b64encode(open(_path,"rb").read())
- #调用接口处理结果
- start_time = time.time()
- _success,_html,swf_images = getAttachDealInterface(_data_base64,_type)
- log("%s result:%s takes:%d"%(_path,str(_success),time.time()-start_time))
- producer()
- comsumer()
- class Dataflow_attachment(Dataflow):
- def __init__(self):
- Dataflow.__init__(self)
- self.process_list_thread = []
- def flow_attachment_process(self):
- self.process_comsumer()
- def monitor_attachment_process(self):
- alive_count = 0
- for _t in self.process_list_thread:
- if _t.is_alive():
- alive_count += 1
- log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
- def process_comsumer(self):
- if len(self.process_list_thread)==0:
- thread_count = 60
- for i in range(thread_count):
- self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
- for t in self.process_list_thread:
- t.start()
- while 1:
- failed_count = 0
- for _i in range(len(self.process_list_thread)):
- t = self.process_list_thread[_i]
- if not t.is_alive():
- failed_count += 1
- self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
- self.prcess_list_thread[_i].start()
- if failed_count>0:
- log("attachment failed %d"%(failed_count))
- time.sleep(5)
- def process_comsumer_handle(self):
- while 1:
- _flag = False
- log("attachment handle:%s"%str(threading.get_ident()))
- try:
- item = self.queue_attachment_ocr.get(True,timeout=0.2)
- log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
- self.attachment_recognize(item,None)
- log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
- except Exception as e:
- _flag = True
- pass
- try:
- item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
- log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
- self.attachment_recognize(item,None)
- log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
- except Exception as e:
- _flag = True and _flag
- pass
- if _flag:
- time.sleep(2)
- def attachment_recognize(self,_dict,result_queue):
- item = _dict.get("item")
- list_attach = _dict.get("list_attach")
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
- _dochtmlcon = dhtml.getProperties().get("dochtmlcon","")
- _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
- log(str(swf_urls))
- if not _succeed:
- item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
- else:
- dhtml.updateSWFImages(swf_urls)
- dhtml.updateAttachment(list_html)
- dhtml.update_row(self.ots_client)
- item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
- item[document_tmp_attachment_extract_status] = 1
- log("document:%d get attachments with result:%s"%(item.get("docid"),str(_succeed)))
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- def flow_attachment(self):
- self.flow_attachment_producer()
- self.flow_attachment_producer_comsumer()
- def getAttachments(self,list_filemd5,columns_to_get=[attachment_filemd5,attachment_path,attachment_size,attachment_attachmenthtml,attachment_filetype,attachment_docids,attachment_status,attachment_swfUrls]):
- list_attachment = []
- rows_to_get = []
- for _md5 in list_filemd5[:50]:
- if _md5 is None:
- continue
- primary_key = [(attachment_filemd5,_md5)]
- rows_to_get.append(primary_key)
- req = BatchGetRowRequest()
- req.add(TableInBatchGetRowItem(attachment_table_name,rows_to_get,columns_to_get,None,1))
- try:
- result = self.ots_client.batch_get_row(req)
- attach_result = result.get_result_by_table(attachment_table_name)
- for item in attach_result:
- if item.is_ok:
- _dict = getRow_ots_primary(item.row)
- if _dict is not None:
- list_attachment.append(attachment(_dict))
- except Exception as e:
- log(str(list_filemd5))
- log("attachProcess comsumer error %s"%str(e))
- return list_attachment
- def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
- qsize_ocr = self.queue_attachment_ocr.qsize()
- qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
- log("queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(qsize_ocr,qsize_not_ocr))
- #选择加入数据场景
- if min(qsize_ocr,qsize_not_ocr)>200 or max(qsize_ocr,qsize_not_ocr)>1000:
- return
- #去重
- set_docid = set()
- set_docid = set_docid | set(self.list_attachment_ocr) | set(self.list_attachment_not_ocr)
- if qsize_ocr>0:
- self.list_attachment_ocr = self.list_attachment_ocr[-qsize_ocr:]
- else:
- self.list_attachment_ocr = []
- if qsize_not_ocr>0:
- self.list_attachment_not_ocr = self.list_attachment_not_ocr[-qsize_not_ocr:]
- else:
- self.list_attachment_not_ocr = []
- try:
- bool_query = BoolQuery(must_queries=[
- RangeQuery(document_tmp_status,*flow_attachment_status_from,True,True),
- # TermQuery(document_tmp_docid,234925191),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_attachment producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- _count = 0
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- continue
- self.queue_attachment.put(_dict,True)
- _count += 1
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- continue
- self.queue_attachment.put(_dict,True)
- _count += 1
- log("add attachment count:%d"%(_count))
- except Exception as e:
- log("flow attachment producer error:%s"%(str(e)))
- traceback.print_exc()
- def flow_attachment_producer_comsumer(self):
- log("start flow_attachment comsumer")
- mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
- mt.run()
- def set_queue(self,_dict):
- list_attach = _dict.get("list_attach")
- to_ocr = False
- for attach in list_attach:
- if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
- to_ocr = True
- break
- if to_ocr:
- self.queue_attachment_ocr.put(_dict,True)
- # self.list_attachment_ocr.append(_dict.get("item").get(document_tmp_docid))
- else:
- self.queue_attachment_not_ocr.put(_dict,True)
- # self.list_attachment_not_ocr.append(_dict.get("item").get(document_tmp_docid))
- def comsumer_handle(self,item,result_queue):
- try:
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- if len(page_attachments)==0:
- item[document_tmp_status] = random.randint(*flow_attachment_status_succeed_to)
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = self.getAttachments(list_fileMd5)
- #未上传成功的2小时内不处理
- if len(page_attachments)!=len(list_attach) and time.mktime(time.localtime())-time.mktime(time.strptime(item.get(document_tmp_crtime),"%Y-%m-%d %H:%M:%S"))<7200:
- item[document_tmp_status] = 1
- dtmp = Document_tmp(item)
- dtmp.update_row(self.ots_client)
- return
- self.set_queue({"item":item,"list_attach":list_attach})
- except Exception as e:
- traceback.print_exc()
- def start_flow_attachment(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
- schedule.add_job(self.flow_attachment,"cron",second="*/10")
- schedule.start()
- class Dataflow_extract(Dataflow):
- def __init__(self):
- Dataflow.__init__(self)
- def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
- q_size = self.queue_extract.qsize()
- if q_size>100:
- return
- set_docid = set(self.list_extract)
- if q_size>0:
- self.list_extract = self.list_extract[-q_size:]
- else:
- self.list_extract = []
- try:
- bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*flow_extract_status_from,True,True)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status",SortOrder.ASC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_extract producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- self.list_extract.insert(0,docid)
- continue
- else:
- self.queue_extract.put(_dict)
- self.list_extract.append(docid)
- _count = len(list_dict)
- while next_token and _count<flow_process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in set_docid:
- self.list_extract.insert(0,docid)
- continue
- else:
- self.queue_extract.put(_dict)
- self.list_extract.append(docid)
- _count += len(list_dict)
- except Exception as e:
- log("flow extract producer error:%s"%(str(e)))
- traceback.print_exc()
- def flow_extract(self,):
- self.comsumer()
- def comsumer(self):
- mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,35,1,True)
- mt.run()
- def comsumer_handle(self,item,result_queue):
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- dhtml.fix_columns(self.ots_client,["dochtmlcon"],True)
- item[document_tmp_dochtmlcon] = dhtml.getProperties().get(document_tmp_dochtmlcon,"")
- _extract = Document_extract({})
- _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
- _extract.setValue(document_extract2_docid,item.get(document_docid))
- all_done = 1
- if all_done:
- data = item
- resp = requests.post(self.other_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_other_json,resp.content.decode("utf8"),True)
- else:
- all_done = -1
- data = {}
- for k,v in item.items():
- data[k] = v
- data["timeout"] = 240
- data["doc_id"] = data.get(document_tmp_docid)
- data["content"] = data.get(document_tmp_dochtmlcon,"")
- if document_tmp_dochtmlcon in data:
- data.pop(document_tmp_dochtmlcon)
- data["title"] = data.get(document_tmp_doctitle,"")
- data["web_source_no"] = item.get(document_tmp_web_source_no,"")
- data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
- if all_done:
- resp = requests.post(self.extract_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
- else:
- all_done = -2
- if all_done:
- resp = requests.post(self.industy_url,json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=210):
- _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
- else:
- all_done = -3
- _dict = {document_partitionkey:item.get(document_tmp_partitionkey),
- document_docid:item.get(document_tmp_docid),
- }
- dtmp = Document_tmp(_dict)
- if all_done!=1:
- sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_failed_to),True)
- dtmp.update_row(self.ots_client)
- else:
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- # 插入接口表,上线放开
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- log("process docid:%d %s"%(data["doc_id"],str(all_done)))
- def start_flow_extract(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_extract_producer,"cron",second="*/10")
- schedule.add_job(self.flow_extract,"cron",second="*/10")
- schedule.start()
- class Dataflow_dumplicate(Dataflow):
- class DeleteListener():
- def __init__(self,conn,_func,*args,**kwargs):
- self.conn = conn
- self._func = _func
- def on_error(self, headers,*args,**kwargs):
- log('received an error %s' % str(headers.body))
- def on_message(self, headers,*args,**kwargs):
- try:
- message_id = headers.headers["message-id"]
- body = headers.body
- log("get message %s"%(message_id))
- self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
- except Exception as e:
- traceback.print_exc()
- pass
- def __del__(self):
- self.conn.disconnect()
- def __init__(self,start_delete_listener=True):
- Dataflow.__init__(self,)
- self.c_f_get_extractCount = f_get_extractCount()
- self.c_f_get_package = f_get_package()
- logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- self.fix_doc_docid = None
- self.bdm = BaseDataMonitor()
- self.check_rule = 1
- if start_delete_listener:
- self.delete_comsumer_counts = 2
- self.doc_delete_queue = "/queue/doc_delete_queue"
- self.doc_delete_result = "/queue/doc_delete_result"
- self.pool_mq_ali = ConnectorPool(1,10,getConnect_activateMQ_ali)
- for _ in range(self.delete_comsumer_counts):
- conn = getConnect_activateMQ_ali()
- listener = self.DeleteListener(conn,self.delete_doc_handle)
- createComsumer(listener,self.doc_delete_queue)
- def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart"]):
- dict_time = {}
- for k in keys:
- dict_time[k] = _extract.get(k)
- return dict_time
- def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- if total_count==0:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)>0:
- return self.post_extract(list_dict[0])
- def post_extract(self,_dict):
- win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
- _dict["win_tenderer"] = win_tenderer
- _dict["bidding_budget"] = bidding_budget
- _dict["win_bid_price"] = win_bid_price
- extract_json = _dict.get(document_tmp_extract_json,"{}")
- _extract = json.loads(extract_json)
- _dict["product"] = ",".join(_extract.get("product",[]))
- _dict["fingerprint"] = _extract.get("fingerprint","")
- _dict["project_codes"] = _extract.get("code",[])
- if len(_dict["project_codes"])>0:
- _dict["project_code"] = _dict["project_codes"][0]
- else:
- _dict["project_code"] = ""
- _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
- if _dict["doctitle_refine"]=="":
- _dict["doctitle_refine"] = _dict.get("doctitle")
- _dict["moneys"] = set(_extract.get("moneys",[]))
- _dict["moneys_attachment"] = set(_extract.get("moneys_attachment",[]))
- _dict["nlp_enterprise"] = json.dumps({"indoctextcon":_extract.get("nlp_enterprise",[]),
- "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])},ensure_ascii=False)
- _dict["extract_count"] = _extract.get("extract_count",0)
- _dict["package"] = self.c_f_get_package.evaluate(extract_json)
- _dict["project_name"] = _extract.get("name","")
- _dict["dict_time"] = self.get_dict_time(_extract)
- _dict["punish"] = _extract.get("punish",{})
- _dict["approval"] = _extract.get("approval",[])
- return _dict
- def dumplicate_fianl_check(self,base_list,b_log=False):
- the_group = base_list
- the_group.sort(key=lambda x:x["confidence"],reverse=True)
- _index = 0
- base_fingerprint = "None"
- if len(base_list)>0:
- base_fingerprint = base_list[0]["fingerprint"]
- final_group = []
- for _i in range(len(base_list)):
- _dict1 = base_list[_i]
- fingerprint_less = _dict1["fingerprint"]
- _pass = True
- if fingerprint_less==base_fingerprint:
- _index = _i
- final_group.append(_dict1)
- continue
- for _dict2 in final_group:
- _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
- if _prob<=0.1:
- _pass = False
- break
- log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
- _index = _i
- if _pass:
- final_group.append(_dict1)
- else:
- break
- return final_group
- def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
- document_less = _dict1
- docid_less = _dict1["docid"]
- docchannel_less = document_less.get("docchannel",0)
- page_time_less = document_less.get("page_time")
- doctitle_refine_less = document_less["doctitle_refine"]
- project_codes_less = document_less.get("project_codes")
- nlp_enterprise_less = document_less["nlp_enterprise"]
- tenderee_less = document_less.get("tenderee","")
- agency_less = document_less.get("agency")
- win_tenderer_less = document_less["win_tenderer"]
- bidding_budget_less = document_less["bidding_budget"]
- win_bid_price_less = document_less["win_bid_price"]
- product_less = document_less.get("product")
- package_less = document_less.get("package")
- json_time_less = document_less.get("dict_time")
- project_name_less = document_less.get("project_name")
- fingerprint_less = document_less.get("fingerprint")
- extract_count_less = document_less.get("extract_count",0)
- web_source_no_less = document_less.get("web_source_no")
- province_less = document_less.get("province")
- city_less = document_less.get("city")
- district_less = document_less.get("district")
- moneys_less = document_less.get("moneys")
- moneys_attachment_less = document_less.get("moneys_attachment")
- page_attachments_less = document_less.get("page_attachments","[]")
- punish_less = document_less.get("punish",{})
- approval_less = document_less.get("approval",[])
- source_type_less = document_less.get("source_type")
- document_greater = _dict2
- docid_greater = _dict2["docid"]
- page_time_greater = document_greater["page_time"]
- docchannel_greater = document_greater.get("docchannel",0)
- doctitle_refine_greater = document_greater.get("doctitle_refine","")
- project_codes_greater = document_greater["project_codes"]
- nlp_enterprise_greater = document_greater["nlp_enterprise"]
- tenderee_greater = document_greater.get("tenderee","")
- agency_greater = document_greater.get("agency","")
- win_tenderer_greater = document_greater["win_tenderer"]
- bidding_budget_greater = document_greater["bidding_budget"]
- win_bid_price_greater = document_greater["win_bid_price"]
- product_greater = document_greater.get("product")
- package_greater = document_greater.get("package")
- json_time_greater = document_greater["dict_time"]
- project_name_greater = document_greater.get("project_name")
- fingerprint_greater = document_greater.get("fingerprint")
- extract_count_greater = document_greater.get("extract_count",0)
- web_source_no_greater = document_greater.get("web_source_no")
- province_greater = document_greater.get("province")
- city_greater = document_greater.get("city")
- district_greater = document_greater.get("district")
- moneys_greater = document_greater.get("moneys")
- moneys_attachment_greater = document_greater.get("moneys_attachment")
- page_attachments_greater = document_greater.get("page_attachments","[]")
- punish_greater = document_greater.get("punish",{})
- approval_greater = document_greater.get("approval",[])
- source_type_greater = document_greater.get("source_type")
- hard_level=1
- if docchannel_less==docchannel_greater==302:
- hard_level=2
- if web_source_no_less==web_source_no_greater=="17397-3":
- hard_level=2
- if self.check_rule==1:
- _prob = check_dumplicate_rule(document_less,document_greater,min_counts,b_log=b_log,hard_level=hard_level)
- else:
- _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
- pagetime_stamp_less = getTimeStamp(page_time_less)
- pagetime_stamp_greater = getTimeStamp(page_time_greater)
-
- day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
- if day_dis>7:
- _prob = 0
- elif day_dis>3:
- if _prob<0.4:
- _prob = 0
- return _prob,day_dis
- def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
- document_less = _dict1
- docid_less = _dict1["docid"]
- docchannel_less = document_less["docchannel"]
- page_time_less = document_less["page_time"]
- doctitle_refine_less = document_less["doctitle_refine"]
- project_codes_less = document_less["project_codes"]
- nlp_enterprise_less = document_less["nlp_enterprise"]
- tenderee_less = document_less["tenderee"]
- agency_less = document_less["agency"]
- win_tenderer_less = document_less["win_tenderer"]
- bidding_budget_less = document_less["bidding_budget"]
- win_bid_price_less = document_less["win_bid_price"]
- product_less = document_less["product"]
- package_less = document_less["package"]
- json_time_less = document_less["dict_time"]
- project_name_less = document_less["project_name"]
- fingerprint_less = document_less["fingerprint"]
- extract_count_less = document_less["extract_count"]
- document_greater = _dict2
- docid_greater = _dict2["docid"]
- page_time_greater = document_greater["page_time"]
- doctitle_refine_greater = document_greater["doctitle_refine"]
- project_codes_greater = document_greater["project_codes"]
- nlp_enterprise_greater = document_greater["nlp_enterprise"]
- tenderee_greater = document_greater["tenderee"]
- agency_greater = document_greater["agency"]
- win_tenderer_greater = document_greater["win_tenderer"]
- bidding_budget_greater = document_greater["bidding_budget"]
- win_bid_price_greater = document_greater["win_bid_price"]
- product_greater = document_greater["product"]
- package_greater = document_greater["package"]
- json_time_greater = document_greater["dict_time"]
- project_name_greater = document_greater["project_name"]
- fingerprint_greater = document_greater["fingerprint"]
- extract_count_greater = document_greater["extract_count"]
- if fingerprint_less==fingerprint_greater:
- return 1
- same_count = 0
- all_count = 8
- if len(set(project_codes_less) & set(project_codes_greater))>0:
- same_count += 1
- if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
- same_count += 1
- if getLength(agency_less)>0 and agency_less==agency_greater:
- same_count += 1
- if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
- same_count += 1
- if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
- same_count += 1
- if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
- same_count += 1
- if getLength(project_name_less)>0 and project_name_less==project_name_greater:
- same_count += 1
- if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
- same_count += 1
- base_prob = 0
- if min_counts<3:
- base_prob = 0.9
- elif min_counts<5:
- base_prob = 0.8
- elif min_counts<8:
- base_prob = 0.7
- else:
- base_prob = 0.6
- _prob = base_prob*same_count/all_count
- if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
- _prob = 0.15
- if _prob<0.1:
- return _prob
- check_result = {"pass":1}
- if docchannel_less in (51,102,103,104,115,116,117):
- if doctitle_refine_less!=doctitle_refine_greater:
- if page_time_less!=page_time_greater:
- check_result["docchannel"] = 0
- check_result["pass"] = 0
- else:
- check_result["docchannel"] = 2
- if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
- check_result["doctitle"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
- else:
- check_result["doctitle"] = 2
- #added check
- if not check_codes(project_codes_less,project_codes_greater):
- check_result["code"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
- else:
- if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
- check_result["code"] = 2
- else:
- check_result["code"] = 1
- if not check_product(product_less,product_greater,doctitle_refine_less,doctitle_refine_greater):
- check_result["product"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
- else:
- if getLength(product_less)>0 and getLength(product_greater)>0:
- check_result["product"] = 2
- else:
- check_result["product"] = 1
- if not check_demand():
- check_result["pass"] = 0
- if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
- tenderee_less,tenderee_greater,
- agency_less,agency_greater,
- win_tenderer_less,win_tenderer_greater):
- check_result["entity"] = 0
- check_result["pass"] = 0
- if b_log:
- logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
- else:
- if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
- check_result["entity"] = 2
- elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
- check_result["entity"] = 2
- else:
- check_result["entity"] = 1
- if not check_money(bidding_budget_less,bidding_budget_greater,
- win_bid_price_less,win_bid_price_greater):
- if b_log:
- logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
- check_result["money"] = 0
- check_result["pass"] = 0
- else:
- if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
- check_result["money"] = 2
- elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
- check_result["money"] = 2
- else:
- check_result["money"] = 1
- #added check
- if not check_package(package_less,package_greater):
- if b_log:
- logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
- check_result["package"] = 0
- check_result["pass"] = 0
- else:
- if getLength(package_less)>0 and getLength(package_greater)>0:
- check_result["package"] = 2
- else:
- check_result["package"] = 1
- #added check
- if not check_time(json_time_less,json_time_greater):
- if b_log:
- logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
- if isinstance(json_time_less,dict):
- time_less = json_time_less
- else:
- time_less = json.loads(json_time_less)
- if isinstance(json_time_greater,dict):
- time_greater = json_time_greater
- else:
- time_greater = json.loads(json_time_greater)
- for k,v in time_less.items():
- if getLength(v)>0:
- v1 = time_greater.get(k,"")
- if getLength(v1)>0:
- if v!=v1:
- log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
- check_result["time"] = 0
- check_result["pass"] = 0
- else:
- if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
- check_result["time"] = 2
- else:
- check_result["time"] = 1
- if check_result.get("pass",0)==0:
- if b_log:
- logging.info(str(check_result))
- if check_result.get("money",1)==0:
- return 0
- if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
- return _prob
- else:
- return 0
- if check_result.get("time",1)==0:
- return 0
- return _prob
- def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
- for _ in range(retry_times):
- try:
- _time = time.time()
- check_time = 0
- if isinstance(_query,list):
- bool_query = BoolQuery(should_queries=_query)
- else:
- bool_query = _query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- list_data = []
- for _dict in list_dict:
- self.post_extract(_dict)
- _docid = _dict.get(document_tmp_docid)
- if merge:
- list_data.append(_dict)
- else:
- if _docid!=item.get(document_tmp_docid):
- _time1 = time.time()
- confidence,day_dis = self.dumplicate_check(item,_dict,total_count,b_log=b_log)
- check_time+= time.time()-_time1
- _dict["confidence"] = confidence
- _dict["min_counts"] = total_count
- list_data.append(_dict)
- all_time = time.time()-_time
- # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
- return list_data
- except Exception as e:
- traceback.print_exc()
- return []
- def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
- list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns,b_log=b_log)
- for _dict in list_dict:
- _docid = _dict.get(document_tmp_docid)
- confidence = _dict["confidence"]
- if b_log:
- log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
- if confidence>0.1:
- if _docid not in set_docid:
- base_list.append(_dict)
- set_docid.add(_docid)
- set_docid.add(_docid)
- def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=False):
- for k,v in _dict.items():
- if getLength(v)==0:
- return
- _dict.update(base_dict)
- if b_log:
- log("rule dict:"+str(_dict))
- _query = self.generate_dumplicate_query(_dict,must_not_dict)
- _rule = {"confidence":confidence,
- "item":item,
- "query":_query,
- "singleNum_keys":[],
- "contain_keys":[],
- "multiNum_keys":[],
- "_dict":_dict}
- list_rules.append(_rule)
- def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False,day_dis=7,table_name ="document_tmp",table_index="document_tmp_index"):
- docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
- current_date = getCurrent_date("%Y-%m-%d")
- if page_time=='':
- page_time = current_date
- two_day_dict = {"page_time":[timeAdd(page_time,-7),timeAdd(page_time,7)]}
- if table_name in {"document_tmp","document"}:
- if page_time>=timeAdd(current_date,-7):
- table_name = "document_tmp"
- table_index = "document_tmp_index"
- base_dict = {
- "docchannel":item.get("docchannel",52),
- "status":[status_from[0]],
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"save":0,"docid":item.get("docid")}
- doctitle_refine_name = "doctitle_refine"
- else:
- table_name = "document"
- table_index = "document_index"
- if get_all:
- _status = [201,450]
- else:
- _status = [201,300]
- base_dict = {
- "docchannel":item["docchannel"],
- "status":_status,
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"docid":item.get("docid")}
- doctitle_refine_name = "doctitle"
- else:
- _status = [201,300]
- base_dict = {
- "docchannel":item["docchannel"],
- "status":_status,
- "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
- }
- must_not_dict = {"docid":item.get("docid")}
- doctitle_refine_name = "doctitle"
- list_rules = []
- singleNum_keys = ["tenderee","win_tenderer"]
- confidence = 100
- self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence = 90
- _dict = {document_tmp_agency:agency,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {document_tmp_agency:agency,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {document_tmp_agency:agency,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {win_tenderer:win_tenderer,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "win_tenderer":win_tenderer}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "win_bid_price":win_bid_price}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "agency":agency,
- "bidding_budget":bidding_budget}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "project_codes":project_code
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"agency":agency,
- "project_codes":project_code
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- base_dict.update(two_day_dict)
- confidence=85
- _dict = {"tenderee":tenderee,
- "agency":agency
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- if getLength(product)>0:
- l_p = product.split(",")
- _dict = {"tenderee":tenderee,
- "product":l_p[0]
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"tenderee":tenderee,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"agency":agency,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_codes":project_code,
- "project_name":project_name
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "win_tenderer":win_tenderer
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"project_name":project_name,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- "win_bid_price":win_bid_price
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_tenderer":win_tenderer,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_bid_price":win_bid_price,
- "bidding_budget":bidding_budget
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence=80
- _dict = {"project_codes":project_code}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"win_bid_price":win_bid_price,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- _dict = {"bidding_budget":bidding_budget,
- doctitle_refine_name:doctitle_refine
- }
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence=80
- _dict = {doctitle_refine_name:doctitle_refine}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- confidence=70
- _dict = {"project_name":project_name}
- self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
- return list_rules,table_name,table_index
- def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
- q_size = self.queue_dumplicate.qsize()
- log("dumplicate queue size %d"%(q_size))
- while 1:
- try:
- docid = self.queue_dumplicate_processed.get(block=False)
- if docid in self.dumplicate_set:
- self.dumplicate_set.remove(docid)
- except Exception as e:
- break
- if q_size>process_count//3:
- return
- bool_query = BoolQuery(must_queries=[
- RangeQuery(document_tmp_status,*status_from,True,True),
- # TermQuery("docid",271983871)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in self.dumplicate_set:
- continue
- self.dumplicate_set.add(docid)
- self.queue_dumplicate.put(_dict)
- _count = len(list_dict)
- while next_token and _count<process_count:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- docid = _dict.get(document_tmp_docid)
- if docid in self.dumplicate_set:
- continue
- self.dumplicate_set.add(docid)
- self.queue_dumplicate.put(_dict)
- _count += len(list_dict)
- # _l = list(self.dumplicate_set)
- # _l.sort(key=lambda x:x,reverse=True)
- # self.dumplicate_set = set(_l[:flow_process_count*2])
- def comsumer_flow_dumplicate(self):
- mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
- mt.run()
- def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
- self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
- # self.comsumer_flow_dumplicate()
- def flow_dumpcate_comsumer(self):
- from multiprocessing import Process
- process_count = 6
- thread_count = 12
- list_process = []
- def start_thread():
- mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
- mt.run()
- for _ in range(process_count):
- p = Process(target=start_thread)
- list_process.append(p)
- for p in list_process:
- p.start()
- while 1:
- for _i in range(len(list_process)):
- p = list_process[_i]
- if not p.is_alive():
- p = Process(target=start_thread)
- list_process[_i] = p
- p.start()
- time.sleep(1)
- # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
- # mt.run()
- def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment,document_tenderee_code,document_agency_code,document_candidates],document_name="document"):
- '''
- 根据docid查询公告内容,先查询document_tmp,再查询document
- :param list_docids:
- :return:
- '''
- list_docs = []
- set_fingerprint = set()
- for _docid in list_docids:
- docid = int(_docid)
- _dict = {document_partitionkey:getPartitionKey(docid),
- document_docid:docid}
- if document_name in {"document","document_tmp"}:
- _doc = Document_tmp(_dict)
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- if not _exists:
- _doc = Document(_dict)
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- else:
- _doc = Document(_dict)
- _doc.table_name = document_name
- _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
- if _exists:
- _fingerprint = _doc.getProperties().get(document_fingerprint)
- if _fingerprint in set_fingerprint:
- continue
- set_fingerprint.add(_fingerprint)
- list_docs.append(_doc)
- for _doc in list_docs:
- try:
- _sub_docs_json = _doc.getProperties().get(document_tmp_sub_docs_json)
- if _sub_docs_json is not None:
- _doc.setValue("sub_docs",json.loads(_sub_docs_json),False)
- except Exception as e:
- traceback.print_exc()
- list_docs.sort(key=lambda x:x.getProperties().get(document_page_time,""))
- return list_docs
- def is_same_package(self,_dict1,_dict2):
- sub_project_name1 = _dict1.get(project_sub_project_name,"")
- if sub_project_name1=="Project":
- sub_project_name1 = ""
- win_tenderer1 = _dict1.get(project_win_tenderer,"")
- win_bid_price1 = _dict1.get(project_win_bid_price,0)
- bidding_budget1 = _dict1.get(project_bidding_budget,0)
- sub_project_name2 = _dict2.get(project_sub_project_name,"")
- if sub_project_name2=="Project":
- sub_project_name2 = ""
- win_tenderer2 = _dict2.get(project_win_tenderer,"")
- win_bid_price2 = _dict2.get(project_win_bid_price,0)
- bidding_budget2 = _dict2.get(project_bidding_budget,0)
- _set = set([a for a in [sub_project_name1,sub_project_name2] if a!=""])
- if len(_set)>1:
- return False
- _set = set([a for a in [win_tenderer1,win_tenderer2] if a!=""])
- if len(_set)>1:
- return False
- _set = set([a for a in [win_bid_price1,win_bid_price2] if a!=0])
- if len(_set)>1:
- return False
- _set = set([a for a in [bidding_budget1,bidding_budget2] if a!=0])
- if len(_set)>1:
- return False
- return True
- def getUpdate_dict(self,_dict):
- update_dict = {}
- for k,v in _dict.items():
- if v is None:
- continue
- if isinstance(v,str):
- if v=="":
- continue
- if isinstance(v,(float,int)):
- if v==0:
- continue
- update_dict[k] = v
- return update_dict
- def update_projects_by_document(self,docid,save,projects,document_name="document"):
- '''
- 更新projects中对应的document的属性
- :param docid:
- :param projects: 项目集合
- :param action:add/delete add时附加唯一属性,delete时删除唯一属性
- :return:
- '''
- list_docs = self.search_docs([docid],document_name=document_name)
- docs = [_doc.getProperties() for _doc in list_docs]
- project_dict = generate_common_properties(docs)
- list_package_properties = generate_packages_properties(docs)
- _dict = {}
- #更新公共属性
- _replace_replace = False
- v = project_dict.get(document_district,"")
- if not (v is None or v=="" or v=="[]" or v=="未知"):
- _replace_replace = True
- for k,v in project_dict.items():
- if not _replace_replace:
- if k in [document_district,document_city,document_province,document_area]:
- continue
- if v is None or v=="" or v=="[]" or v=="未知":
- continue
- if k in (project_project_dynamics,project_product,project_project_codes,project_docids,project_candidates,project_zhong_biao_page_time,project_zhao_biao_page_time,project_page_time,project_docchannel):
- continue
- _dict[k] = v
- for _proj in projects:
- _proj.update(_dict)
- for _proj in projects:
- if _proj.get(project_page_time,"")<=project_dict.get(project_page_time,""):
- _proj[project_page_time] = project_dict.get(project_page_time,"")
- _proj[project_docchannel] = project_dict.get(project_docchannel,"")
- else:
- if project_docchannel in project_dict:
- project_dict.pop(project_docchannel)
- if _proj.get(project_zhong_biao_page_time,"")>project_dict.get(project_zhong_biao_page_time,""):
- _proj[project_zhong_biao_page_time] = project_dict.get(project_zhong_biao_page_time,"")
- if _proj.get(project_zhao_biao_page_time,"")>project_dict.get(project_zhao_biao_page_time,""):
- _proj[project_zhao_biao_page_time] = project_dict.get(project_zhao_biao_page_time,"")
- for _proj in projects:
- #拼接属性
- append_dict = {}
- set_docid = set()
- set_product = set()
- set_code = set()
- set_nlp_enterprise = set()
- set_nlp_enterprise_attachment = set()
- set_candidates = set()
- _docids = _proj.get(project_docids,"")
- _codes = _proj.get(project_project_codes,"")
- _product = _proj.get(project_product,"")
- set_docid = set(_docids.split(","))
- if save==1:
- set_docid.add(str(docid))
- else:
- if str(docid) in set_docid:
- set_docid.remove(str(docid))
- set_code = set_code | set(_codes.split(","))
- set_product = set_product | set(_product.split(","))
- try:
- set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
- set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
- list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
- for item in list_candidates:
- if item.get("name") is not None and item.get("name") not in set_candidates:
- set_candidates.add(item.get("name"))
- set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
- set_product = set_product | set(project_dict.get(project_product,"").split(","))
- set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
- set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
- for item in json.loads(_proj.get(project_candidates,"[]")):
- if item.get("name") is not None and item.get("name") not in set_candidates:
- set_candidates.add(item.get("name"))
- list_candidates.append(item)
- except Exception as e:
- pass
- append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
- append_dict[project_docid_number] = len(set_docid)
- append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
- append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
- append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
- append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
- append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
- dict_dynamic = {}
- set_docid = set()
- _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
- for _dy in _dynamic:
- _docid = _dy.get("docid")
- dict_dynamic[_docid] = _dy
- _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
- for _dy in _dynamic:
- _docid = _dy.get("docid")
- dict_dynamic[_docid] = _dy
- list_dynamics = []
- for k,v in dict_dynamic.items():
- list_dynamics.append(v)
- list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
- append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
- _proj.update(append_dict)
- dict_package = {}
- for _pp in projects:
- _counts = 0
- sub_project_name = _pp.get(project_sub_project_name,"")
- if sub_project_name=="Project":
- sub_project_name = ""
- win_tenderer = _pp.get(project_win_tenderer,"")
- win_bid_price = _pp.get(project_win_bid_price,0)
- bidding_budget = _pp.get(project_bidding_budget,0)
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- dict_package[_key] = _pp
- _counts +=1
- if _counts==0:
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- dict_package[_key] = _pp
- _counts += 1
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- #更新私有属性
- if len(projects)==1 and len(list_package_properties)==1:
- _pp = list_package_properties[0]
- pp = projects[0]
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,pp.get("uuid"))
- pp.update(_pp)
- else:
- for _pp in list_package_properties:
- flag_update = False
- sub_project_name = _pp.get(project_sub_project_name,"")
- if sub_project_name=="Project":
- sub_project_name = ""
- win_tenderer = _pp.get(project_win_tenderer,"")
- win_bid_price = _pp.get(project_win_bid_price,0)
- bidding_budget = _pp.get(project_bidding_budget,0)
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- if _key in dict_package:
- if self.is_same_package(_pp,dict_package[_key]):
- ud = self.getUpdate_dict(_pp)
- self.set_project_uuid(ud,dict_package[_key].get("uuid"))
- dict_package[_key].update(ud)
- flag_update = True
- continue
- if not flag_update:
- _pp.update(project_dict)
- projects.append(_pp)
- _counts = 0
- if win_tenderer!="" and bidding_budget!=0:
- _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- if win_tenderer!="" and win_bid_price!=0:
- _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
- dict_package[_key] = _pp
- _counts +=1
- if _counts==0:
- if win_tenderer!="":
- _key = "%s-%s"%(sub_project_name,win_tenderer)
- dict_package[_key] = _pp
- _counts += 1
- if bidding_budget!=0:
- _key = "%s-%s"%(sub_project_name,str(bidding_budget))
- dict_package[_key] = _pp
- _counts += 1
- def delete_projects_by_document(self,docid):
- '''
- 更新projects中对应的document的属性
- :param docid:
- :param projects: 项目集合
- :param action:add/delete add时附加唯一属性,delete时删除唯一属性
- :return:
- '''
- set_docid = set()
- list_delete_projects = []
- list_projects = self.search_projects_with_document([docid])
- for _proj in list_projects:
- _p = {}
- _docids = _proj.get(project_docids,"")
- print(_proj.get(project_uuid))
- _p["delete_uuid"] = _proj.get(project_uuid)
- _p["to_delete"] = True
- list_delete_projects.append(_p)
- if _docids!="":
- set_docid = set_docid | set(_docids.split(","))
- if str(docid) in set_docid:
- set_docid.remove(str(docid))
- list_docid = list(set_docid)
- list_projects = []
- if len(list_docid)>0:
- list_docs = self.search_docs(list_docid)
- print("search_docs(list_docid)")
- list_projects = self.generate_projects_from_document(list_docs)
- print("generate_projects_from_document")
- list_projects = dumplicate_projects(list_projects,max_count=20)
- print("dumplicate_projects")
- list_projects.extend(list_delete_projects)
- project_json = to_project_json(list_projects)
- return project_json
- def delete_doc_handle(self,_dict,result_queue):
- try:
- headers = _dict.get("frame")
- conn = _dict.get("conn")
- if headers is not None:
- message_id = headers.headers["message-id"]
- body = headers.body
- item = json.loads(body)
- docid = item.get("docid")
- log("==========start delete docid:%s"%(str(docid)))
- if docid is None:
- ackMsg(conn,message_id)
- delete_result = self.delete_projects_by_document(docid)
- log("1")
- _uuid = uuid4().hex
- _d = {PROJECT_PROCESS_UUID:_uuid,
- PROJECT_PROCESS_CRTIME:1,
- PROJECT_PROCESS_PROJECTS:delete_result}
- _pp = Project_process(_d)
- log("2")
- try:
- if _pp.update_row(self.ots_client):
- ackMsg(conn,message_id)
- except Exception as e:
- ackMsg(conn,message_id)
- log("3")
- #取消插入结果队列,改成插入project_process表
- # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
- # ackMsg(conn,message_id)
- log("==========end delete docid:%s"%(str(docid)))
- else:
- log("has not headers")
- except Exception as e:
- traceback.print_exc()
- ackMsg(conn,message_id)
- log("==========end delete docid:%s"%(str(docid)))
- def generate_common_properties(self,list_docs):
- '''
- #通用属性生成
- :param list_docis:
- :return:
- '''
- #计数法选择
- choose_dict = {}
- project_dict = {}
- for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
- for _doc in list_docs:
- _value = _doc.getProperties().get(_key,"")
- if _value!="":
- if _key not in choose_dict:
- choose_dict[_key] = {}
- if _value not in choose_dict[_key]:
- choose_dict[_key][_value] = 0
- choose_dict[_key][_value] += 1
- _find = False
- for _key in [document_district,document_city,document_province,document_area]:
- area_dict = {}
- for _doc in list_docs:
- loc = _doc.getProperties().get(_key,"未知")
- if loc not in ('全国','未知',"0"):
- if loc not in area_dict:
- area_dict[loc] = 0
- area_dict[loc] += 1
- list_loc = []
- for k,v in area_dict.items():
- list_loc.append([k,v])
- list_loc.sort(key=lambda x:x[1],reverse=True)
- if len(list_loc)>0:
- project_dict[document_district] = _doc.getProperties().get(document_district)
- project_dict[document_city] = _doc.getProperties().get(document_city)
- project_dict[document_province] = _doc.getProperties().get(document_province)
- project_dict[document_area] = _doc.getProperties().get(document_area)
- _find = True
- break
- if not _find:
- if len(list_docs)>0:
- project_dict[document_district] = list_docs[0].getProperties().get(document_district)
- project_dict[document_city] = list_docs[0].getProperties().get(document_city)
- project_dict[document_province] = list_docs[0].getProperties().get(document_province)
- project_dict[document_area] = list_docs[0].getProperties().get(document_area)
- for _key,_value in choose_dict.items():
- _l = []
- for k,v in _value.items():
- _l.append([k,v])
- _l.sort(key=lambda x:x[1],reverse=True)
- if len(_l)>0:
- _v = _l[0][0]
- if _v in ('全国','未知'):
- if len(_l)>1:
- _v = _l[1][0]
- project_dict[_key] = _v
- list_dynamics = []
- docid_number = 0
- visuable_docids = []
- zhao_biao_page_time = ""
- zhong_biao_page_time = ""
- list_codes = []
- list_product = []
- p_page_time = ""
- remove_docids = set()
- for _doc in list_docs:
- table_name = _doc.getProperties().get("table_name")
- status = _doc.getProperties().get(document_status,0)
- _save = _doc.getProperties().get(document_tmp_save,1)
- doctitle = _doc.getProperties().get(document_doctitle,"")
- docchannel = _doc.getProperties().get(document_docchannel)
- page_time = _doc.getProperties().get(document_page_time,"")
- _docid = _doc.getProperties().get(document_docid)
- _bidway = _doc.getProperties().get(document_bidway,"")
- _docchannel = _doc.getProperties().get(document_life_docchannel,0)
- project_codes = _doc.getProperties().get(document_project_codes)
- product = _doc.getProperties().get(document_product)
- sub_docs = _doc.getProperties().get("sub_docs",[])
- is_multipack = True if len(sub_docs)>1 else False
- extract_count = _doc.getProperties().get(document_tmp_extract_count,0)
- if product is not None:
- list_product.extend(product.split(","))
- if project_codes is not None:
- _c = project_codes.split(",")
- list_codes.extend(_c)
- if p_page_time=="":
- p_page_time = page_time
- if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
- zhao_biao_page_time = page_time
- if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
- zhong_biao_page_time = page_time
- is_visuable = 0
- if table_name=="document":
- if status>=201 and status<=300:
- docid_number +=1
- visuable_docids.append(str(_docid))
- is_visuable = 1
- else:
- remove_docids.add(str(_docid))
- else:
- if _save==1:
- docid_number +=1
- visuable_docids.append(str(_docid))
- is_visuable = 1
- else:
- remove_docids.add(str(_docid))
- list_dynamics.append({document_docid:_docid,
- document_doctitle:doctitle,
- document_docchannel:_docchannel,
- document_bidway:_bidway,
- document_page_time:page_time,
- document_status:201 if is_visuable==1 else 401,
- "is_multipack":is_multipack,
- document_tmp_extract_count:extract_count
- }
- )
- project_dict[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
- project_dict[project_docid_number] = docid_number
- project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
- if zhao_biao_page_time !="":
- project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
- if zhong_biao_page_time !="":
- project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
- project_dict[project_project_codes] = ",".join(list(set(list_codes)))
- project_dict[project_page_time] = p_page_time
- project_dict[project_product] = ",".join(list(set(list_product)))
- return project_dict
- def generate_packages_properties(self,list_docs):
- '''
- 生成分包属性
- :param list_docs:
- :return:
- '''
- list_properties = []
- set_key = set()
- for _doc in list_docs:
- _dict = {}
- sub_docs = _doc.getProperties().get("sub_docs")
- if sub_docs is not None:
- for _d in sub_docs:
- sub_project_code = _d.get(project_sub_project_code,"")
- sub_project_name = _d.get(project_sub_project_name,"")
- win_tenderer = _d.get(project_win_tenderer,"")
- win_bid_price = _d.get(project_win_bid_price,"")
- _key = "%s-%s-%s-%s"%(sub_project_code,sub_project_name,win_tenderer,win_bid_price)
- if _key in set_key:
- continue
- set_key.add(_key)
- list_properties.append(_d)
- return list_properties
- def generate_projects_from_document(self,list_docs):
- '''
- #通过公告生成projects
- :param list_docids:
- :return:
- '''
- #判断标段数
- list_projects = generate_projects([doc.getProperties() for doc in list_docs])
- return list_projects
- def search_projects_with_document(self,list_docids,project_table,project_table_index):
- '''
- 通过docid集合查询对应的projects
- :param list_docids:
- :return:
- '''
- log("search_projects_with_document %s"%str(list_docids))
- list_should_q = []
- for _docid in list_docids:
- list_should_q.append(TermQuery("docids",_docid))
- bool_query = BoolQuery(should_queries=list_should_q)
- _query = {"query":bool_query,"limit":20}
- list_project_dict = getDocument(_query,self.ots_client,[
- project_uuid,project_docids,project_zhao_biao_page_time,
- project_zhong_biao_page_time,
- project_page_time,
- project_area,
- project_province,
- project_city,
- project_district,
- project_info_type,
- project_industry,
- project_qcodes,
- project_project_name,
- project_project_code,
- project_project_codes,
- project_project_addr,
- project_tenderee,
- project_tenderee_addr,
- project_tenderee_phone,
- project_tenderee_contact,
- project_agency,
- project_agency_phone,
- project_agency_contact,
- project_sub_project_name,
- project_sub_project_code,
- project_bidding_budget,
- project_win_tenderer,
- project_win_bid_price,
- project_win_tenderer_manager,
- project_win_tenderer_phone,
- project_second_tenderer,
- project_second_bid_price,
- project_second_tenderer_manager,
- project_second_tenderer_phone,
- project_third_tenderer,
- project_third_bid_price,
- project_third_tenderer_manager,
- project_third_tenderer_phone,
- project_procurement_system,
- project_bidway,
- project_dup_data,
- project_docid_number,
- project_project_dynamics,
- project_product,
- project_moneysource,
- project_service_time,
- project_time_bidclose,
- project_time_bidopen,
- project_time_bidstart,
- project_time_commencement,
- project_time_completion,
- project_time_earnest_money_start,
- project_time_earnest_money_end,
- project_time_get_file_end,
- project_time_get_file_start,
- project_time_publicity_end,
- project_time_publicity_start,
- project_time_registration_end,
- project_time_registration_start,
- project_time_release,
- project_dup_docid,
- project_info_source,
- project_nlp_enterprise,
- project_nlp_enterprise_attachment,
- project_tenderee_code,
- project_agency_code,
- project_candidates,
- project_docchannel
- ],sort="page_time",table_name=project_table,table_index=project_table_index)
- return list_project_dict
- def set_project_uuid(self,_dict,_uuid):
- if _uuid is not None and _uuid!="":
- if "uuid" in _dict:
- _dict["uuid"] = "%s,%s"%(_dict["uuid"],_uuid)
- else:
- _dict["uuid"] = _uuid
- def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
- whole_time_start = time.time()
- _time = time.time()
- list_query = []
- list_code = [a for a in project_codes.split(",") if a!='']
- should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
- # print("should_q_code",[a for a in list_code[:20]])
- should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
- list_product = [a for a in product.split(",") if a!='']
- should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
- should_q_area = None
- if province!="" or city!="" or district!="":
- should_q = []
- if province not in ("","全国","未知") and province is not None:
- should_q.append(TermQuery(project_province,province))
- if city not in ("","全国","未知") and city is not None:
- should_q.append(TermQuery(project_city,city))
- if district not in ("","全国","未知") and district is not None:
- should_q.append(TermQuery(project_district,district))
- if len(should_q)>0:
- should_q_area = BoolQuery(should_queries=should_q)
- prepare_time = time.time()-_time
- _time = time.time()
- # log("list_code %s"%(str(list_code)))
- # log("list_product %s"%(str(list_product)))
- # log("tenderee %s"%(tenderee))
- # log("bidding_budget %s"%(bidding_budget))
- # log("win_tenderer %s"%(win_tenderer))
- # log("win_bid_price %s"%(win_bid_price))
- # log("project_name %s"%(project_name))
- log_time = time.time()-_time
- _time = time.time()
- if tenderee!="" and len(list_code)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_code,
- ]
- list_query.append([_query,2])
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_cod
- ]
- list_query.append([_query,2])
- if tenderee!="" and len(list_product)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- should_q_product]
- list_query.append([_query,1])
- if tenderee!="" and project_name!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_project_name,project_name)]
- list_query.append([_query,2])
- if tenderee!="" and agency!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_agency,agency)]
- list_query.append([_query,0])
- if tenderee!="" and float(bidding_budget)>0:
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_bidding_budget,bidding_budget)]
- list_query.append([_query,2])
- if float(bidding_budget)>0 and float(win_bid_price)>0:
- _query = [TermQuery(project_bidding_budget,bidding_budget),
- TermQuery(project_win_bid_price,win_bid_price)]
- list_query.append([_query,2])
- if tenderee!="" and win_tenderer!="":
- _query = [TermQuery(project_tenderee,tenderee),
- TermQuery(project_win_tenderer,win_tenderer)]
- list_query.append([_query,2])
- if agency!="" and win_tenderer!="":
- _query = [TermQuery(project_agency,agency),
- TermQuery(project_win_tenderer,win_tenderer)]
- list_query.append([_query,0])
- if agency!="" and len(list_product)>0:
- _query = [TermQuery(project_agency,agency),
- should_q_product]
- list_query.append([_query,1])
- if win_tenderer!="" and len(list_code)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- should_q_code]
- list_query.append([_query,2])
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- should_q_cod]
- list_query.append([_query,2])
- if win_tenderer!="" and sub_project_name!="":
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_sub_project_name,sub_project_name)
- ]
- list_query.append([_query,2])
- if win_tenderer!="" and float(win_bid_price)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_win_bid_price,win_bid_price)]
- list_query.append([_query,2])
- if win_tenderer!="" and float(bidding_budget)>0:
- _query = [TermQuery(project_win_tenderer,win_tenderer),
- TermQuery(project_bidding_budget,bidding_budget)]
- list_query.append([_query,2])
- if len(list_code)>0 and len(list_product)>0:
- _query = [should_q_code,
- should_q_product]
- list_query.append([_query,2])
- if len(list_code)>0:
- _query = [
- should_q_code]
- list_query.append([_query,2])
- _query = [
- should_q_cod]
- list_query.append([_query,1])
- if project_name!="" and project_name is not None:
- _query = [
- TermQuery(project_project_name,project_name)]
- list_query.append([_query,1])
- _query_title = [MatchPhraseQuery(project_doctitles,project_name)]
- list_query.append([_query_title,1])
- if len(list_product)>0 and should_q_area is not None:
- _query = [should_q_area,
- should_q_product]
- list_query.append([_query,0])
- generate_time = time.time()-_time
- whole_time = time.time()-whole_time_start
- # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
- return list_query
- def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment,project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source],project_table="project2",project_table_index="project2_index_formerge"):
- '''
- 对项目进行合并
- :return:
- '''
- try:
- whole_time_start = time.time()
- set_uuid = set()
- for _proj in list_projects:
- _uuid = _proj.get("uuid")
- if _uuid is not None:
- set_uuid = set_uuid | set(_uuid.split(","))
- projects_merge_count = 0
- projects_check_rule_time = 0
- projects_update_time = 0
- projects_query_time = 0
- projects_prepare_time = 0
- current_date = getCurrent_date("%Y-%m-%d")
- min_date = timeAdd(current_date,-35,format="%Y-%m-%d")
- search_table = "project2"
- search_table_index = "project2_index_formerge"
- project_cls = Project
- docids = ""
- for _proj in list_projects[:30]:
- must_not_q = []
- for _uuid in list(set_uuid):
- must_not_q.append(TermQuery("uuid",_uuid))
- docids = _proj.get(project_docids,"")
- page_time = _proj.get(project_page_time,"")
- project_codes = _proj.get(project_project_codes,"")
- project_name = _proj.get(project_project_name,"")
- tenderee = _proj.get(project_tenderee,"")
- agency = _proj.get(project_agency,"")
- product = _proj.get(project_product,"")
- sub_project_name = _proj.get(project_sub_project_name,"")
- bidding_budget = _proj.get(project_bidding_budget,-1)
- win_tenderer = _proj.get(project_win_tenderer,"")
- win_bid_price = _proj.get(project_win_bid_price,-1)
- _dynamic = _proj.get(project_project_dynamics,"[]")
- is_yanshou = False
- list_dynamic = json.loads(_dynamic)
- for _d in list_dynamic:
- _title = _d.get("doctitle","")
- if re.search("验收公[示告]|验收结果",_title) is not None or _d.get("docchannel")==122:
- is_yanshou = True
- break
- province = _proj.get(project_province,"")
- city = _proj.get(project_city,"")
- district = _proj.get(project_district,"")
- if is_yanshou:
- page_time_less = timeAdd(page_time,-850)
- page_time_greater = timeAdd(page_time,820)
- else:
- page_time_less = timeAdd(page_time,-450)
- page_time_greater = timeAdd(page_time,420)
- sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
- _time = time.time()
- list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
- list_merge_data = []
- search_table = "project2"
- search_table_index = "project2_index_formerge"
- project_cls = Project
- search_table = project_table
- search_table_index = project_table_index
- # print("page_time,min_date",page_time,min_date)
- # if page_time>=min_date:
- # search_table = "project2_tmp"
- # search_table_index = "project2_tmp_index"
- # project_cls = Project_tmp
- _step = 2
- _begin = 0
- must_queries = []
- if page_time_less is not None and page_time_greater is not None:
- must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
- # RangeQuery("status",201,301)
- ]
- #sub_project_name非必要条件
- # if sub_project_q is not None:
- # must_queries.append(sub_project_q)
- projects_prepare_time += time.time()-_time
- _time = time.time()
- sort_type = SortOrder.DESC
- while _begin<len(list_must_query):
- if sort_type==SortOrder.DESC:
- sort_type=SortOrder.ASC
- if sort_type==SortOrder.ASC:
- sort_type=SortOrder.DESC
- list_should_q = []
- _limit = 10
- for must_q,_count in list_must_query[_begin:_begin+_step]:
- must_q1 = list(must_q)
- must_q1.extend(must_queries)
- list_should_q.append(BoolQuery(must_queries=must_q1))
- _limit += _count*5
- _query = BoolQuery(
- should_queries=list_should_q,
- must_not_queries=must_not_q[:100]
- )
- # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
- # SearchQuery(_query,limit=_limit),
- # columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
- rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
- SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
- columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_merge_data.extend(list_data)
- # print(list_data)
- for _data in list_data:
- must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
- _begin += _step
- projects_query_time += time.time()-_time
- #优先匹配招标金额相近的
- projects_merge_count = len(list_merge_data)
- list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
- list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
- # log(page_time_less+"=="+page_time_greater)
- if b_log:
- log("list_merge_data count:%d"%(len(list_merge_data)))
- list_check_data = []
- for _data in list_merge_data:
- _time = time.time()
- _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
- if b_log:
- log(str(_check))
- projects_check_rule_time += time.time()-_time
- if _check:
- list_check_data.append([_data,_prob])
- list_check_data.sort(key=lambda x:x[1],reverse=True)
- for _data,_ in list_check_data:
- _time = time.time()
- _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
- projects_check_rule_time += time.time()-_time
- _time = time.time()
- if _check:
- # o_proj = project_cls(_data)
- # o_proj.fix_columns(self.ots_client,fix_columns,True)
- # for k in fix_columns:
- # _data[k] = o_proj.getProperties().get(k)
- update_projects_by_project(_data,[_proj])
- projects_update_time += time.time()-_time
- whole_time = time.time()-whole_time_start
- log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
- return list_projects
- except Exception as e:
- traceback.print_exc()
- assert 1==2
- def dumplicate_document_in_merge(self,list_projects,dup_docid,_docid,_docchannel,document_name="document",b_log=False):
- '''
- 合并时去重
- :param list_projects:
- :return:
- '''
- dup_docid = set([str(a) for a in dup_docid])
- set_dup_total = set()
- docid_item = self.get_attrs_before_dump(_docid)
- best_docid = None
- for _proj in list_projects:
- try:
- docids = _proj.get(project_docids,"")
- set_docids = set([a for a in docids.split(",") if a!=""])
- _project_dynamics = _proj.get(project_project_dynamics,"[]")
- list_dynamics = json.loads(_project_dynamics)
- set_dup_docid = set()
- list_dup_result = [(_docid,docid_item.get("extract_count"))]
- log("=========%s---%s"%(str(set_docids),str(_docid)))
- if str(_docid) in set_docids:
- list_to_dup_docid = []
- for _d in list_dynamics:
- docid = _d.get(document_docid)
- doctitle = _d.get(document_doctitle,"")
- docchannel = _d.get(document_docchannel,0)
- status = _d.get(document_status,0)
- if status>=401:
- continue
- if str(docid) not in set_docids:
- continue
- if str(docid) in dup_docid:
- continue
- if docchannel!=_docchannel:
- continue
- if docid==_docid:
- continue
- list_to_dup_docid.append(_d)
- for _d in list_to_dup_docid:
- docid = _d.get(document_docid)
- _item = self.get_attrs_before_dump(docid)
- _prob = check_dumplicate_rule(docid_item,_item,5,b_log=b_log)
- log("dumplicate_document_in_merge %s-%s prob %.2f"%(str(_docid),str(docid),_prob))
- if _prob>0.4:
- docid = int(docid)
- _d = {"partitionkey":docid%500+1,
- "docid":docid,
- }
- _doc = Document(_d)
- _doc.table_name = document_name
- if _doc.fix_columns(self.ots_client,[document_page_time,document_update_document],True):
- if _doc.getProperties().get(document_update_document,"")!="true":
- list_dup_result.append((docid,_item.get("extract_count")))
- list_dup_result.sort(key=lambda x:x[0])
- list_dup_result.sort(key=lambda x:x[1],reverse=True)
- if len(list_dup_result)>0:
- best_docid1 = list_dup_result[0][0]
- if best_docid1 not in set_dup_total:
- best_docid = best_docid1
- for _d in list_dup_result[1:]:
- set_dup_docid.add(str(_d[0]))
- for _dynamic in list_dynamics:
- if _dynamic.get(document_docid) in set_dup_docid:
- _dynamic[document_status] = 401
- set_docids = set_docids-set_dup_docid-dup_docid
- set_dup_total |= set_dup_docid
- if len(set_docids)==0:
- print(set_dup_docid,dup_docid)
- log("projects set_docids length is zero %s"%(docids))
- return None,None
- else:
- _proj[project_docids] = ",".join(list(set_docids))
- _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
- _proj[project_docid_number] = len(set_docids)
- _proj[project_dup_docid] = ",".join(list(set_dup_docid))
- # log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
- except Exception as e:
- traceback.print_exc()
- if best_docid in set_dup_total:
- best_docid = None
- return best_docid,list(set_dup_total)
- def merge_document_real(self,item,dup_docid,save,document_name="document",project_table="project2",project_table_index="project2_index_formerge",b_log=False):
- '''
- 实时项目合并
- :param item:
- :param dup_docid:重复的公告集合
- :param status_to:
- :return:
- '''
- try:
- list_docids = []
- _docid = item.get(document_tmp_docid)
- list_docids.append(_docid)
- print("dup_docid",dup_docid)
- if save==0:
- dup_docid.insert(0,_docid)
- if isinstance(dup_docid,list):
- list_docids.extend(dup_docid)
- list_docids = [a for a in list_docids if a is not None]
- _time = time.time()
- list_projects = self.search_projects_with_document(list_docids,project_table,project_table_index)
- log("search %d projects takes:%.3f"%(len(list_projects),time.time()-_time))
- if len(list_projects)==0:
- # _time = time.time()
- list_docs = self.search_docs(list_docids,document_name=document_name)
- # log("search document takes:%.3f"%(time.time()-_time))
- # _time = time.time()
- list_projects = self.generate_projects_from_document(list_docs)
- # log("generate projects takes:%.3f"%(time.time()-_time))
- else:
- _time = time.time()
- self.update_projects_by_document(_docid,save,list_projects,document_name=document_name)
- # log("update projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- list_projects = dumplicate_projects(list_projects)
- # log("dumplicate projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- list_projects = self.merge_projects(list_projects,b_log,project_table=project_table,project_table_index=project_table_index)
- # log("merge projects takes:%.3f"%(time.time()-_time))
- _time = time.time()
- best_docid,list_merge_dump = self.dumplicate_document_in_merge(list_projects,dup_docid,_docid,item.get(document_docchannel),document_name=document_name,b_log=b_log)
- # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
- if list_merge_dump is None:
- list_projects = []
- _time = time.time()
- project_json = to_project_json(list_projects)
- # log("json projects takes:%.3f"%(time.time()-_time))
- if b_log:
- log("project_json:%s"%project_json)
- return project_json,best_docid,list_merge_dump
- except Exception as e:
- raise RuntimeError("error on dumplicate")
- def is_exist_fingerprint(self,final_list,_docid,_fingerprint,is_tmp=False):
- set_fingerprint = set()
- for _i in range(1,len(final_list)):
- _dict = final_list[_i]
- b_docid = _dict[document_tmp_docid]
- _save = _dict.get(document_tmp_save,0)
- _status = _dict.get(document_tmp_status,0)
- if not is_tmp:
- if _status>=201 and _status<=300:
- _save = 1
- fingerprint_less = _dict.get(document_tmp_fingerprint,"")
- if b_docid==_docid:
- pass
- else:
- if _save==1:
- set_fingerprint.add(fingerprint_less)
- if _fingerprint in set_fingerprint:
- return True
- return False
- def exists_normal_fingerprint(self,_fingerprint,docid,table_name="document",table_index="document_index"):
- query = BoolQuery(must_queries=[
- RangeQuery("status",201,301),
- TermQuery("fingerprint",_fingerprint),
- RangeQuery("docid",0,docid-400000),
- ]
- )
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(query,get_total_count=True,limit=1))
- if total_count>0:
- return True
- return False
- def check_page_time(self,item,table_name="document",table_index="document_index"):
- page_time = item.get(document_page_time,"")
- has_before = False
- has_after = False
- bidclose_time = page_time
- web_source_name = item.get(document_tmp_web_source_name,"")
- docchannel = item.get(document_tmp_docchannel,"0")
- try:
- docchannel = int(docchannel)
- except:
- docchannel = 0
- if docchannel<200:
- if len(page_time)>0:
- l_page_time = timeAdd(page_time,days=-90)
- dict_time = item.get("dict_time",{})
- for k,v in dict_time.items():
- if v is not None and len(v)>0:
- if l_page_time>v:
- has_before = True
- if v>page_time:
- has_after = True
- if k==document_tmp_time_bidclose:
- bidclose_time = v
- set_web_source = {"中国招标投标公共服务平台","比地招标"}
- if web_source_name in set_web_source and bidclose_time<page_time:
- return False
- log("%s check page_time has_before %s has_after %s"%(str(item.get(document_docid)),str(has_before),str(has_after)))
- if has_before:
- _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
- must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
- if not has_after:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(_query,get_total_count=True,limit=1))
- if total_count>0:
- log("%s check page_time false %s==%s-%s"%(str(item.get(document_docid)),l_page_time,k,v))
- return False
- if item.get(document_web_source_name,"")=="中国政府采购网":
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
- SearchQuery(_query,get_total_count=True,limit=1))
- if total_count>0:
- log("%s check 中国政府采购网 false "%(str(item.get(document_docid))))
- return False
- return True
- def dumplicate_comsumer_handle_interface(self,docid,document_table,document_table_index,project_table,project_table_index,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=False,upgrade=False):
- result_dict = {"success":True}
- try:
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(document_table,document_table_index,
- SearchQuery(bool_query,limit=1,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)==0:
- raise RuntimeError("未查找到docid为%s的数据"%(str(docid)))
- item = list_dict[0]
- self.post_extract(item)
- log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=False,to_log=False,table_name=document_table,table_index=document_table_index)
- # print("len_rules",len(list_rules),table_name,table_index)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
- list_rules = list_rules[:30]
- _i = 0
- step = 2
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
- _i += step
- _time = time.time()
- # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list,b_log)
- exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
- exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid),table_name=table_name,table_index=table_index)
- # print("exist_normal_fingerprint",exist_normal_fingerprint)
- # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- final_list_docid = [a["docid"] for a in final_list]
- # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- if _dict.get("update_document","")!="true":
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- remove_list = []
- _unnormal = False
- dmp_docid = ""
- _check_time = self.check_page_time(item,table_name=table_name,table_index=table_index)
- if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
- dtmp.setValue(document_tmp_save,1,True)
- # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- else:
- if exist_normal_fingerprint:
- log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
- best_docid = -1
- dmp_docid = ""
- _unnormal = True
- if not _check_time:
- best_docid = -2
- dmp_docid = ""
- _unnormal = True
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- list_docids = list(dup_docid)
- # if item.get(document_update_document)=="true":
- # dtmp.setValue(document_tmp_save,1,True)
- list_merge_dump = []
- if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
- if exist_finterprint:
- log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
- dtmp.setValue(document_tmp_projects,"[]",True)
- else:
- project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),document_name=document_table,project_table=project_table,project_table_index=project_table_index,b_log=b_log)
- if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
- best_docid = merge_best_docid
- if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
- dtmp.setValue(document_tmp_save,0,True)
- if list_merge_dump is not None:
- dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
- dtmp.setValue(document_tmp_projects,project_json,True)
- result_dict["projects"] = project_json
- log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
- dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
- if str(best_docid) in dmp_docid:
- dmp_docid.remove(str(best_docid))
- dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
- result_dict["best_docid"] = str(best_docid) if best_docid is not None else ""
- result_dict["save"] = dtmp.getProperties().get("save")
- result_dict["dmp_docid"] = dmp_docid
- except Exception as e:
- result_dict["success"] = False
- result_dict["errmsg"] = str(e)
- return result_dict
- def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
- try:
- start_time = time.time()
- b_log = False if upgrade else True
- self.post_extract(item)
- log("dumplicate start on:%s"%(str(item.get(document_tmp_docid))))
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=b_log)
- # print("len_rules",len(list_rules),table_name,table_index)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
- list_rules = list_rules[:30]
- _i = 0
- step = 2
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
- _i += step
- _time = time.time()
- # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list,b_log)
- exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),is_tmp=table_name=="document_tmp")
- exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid))
- # print("exist_normal_fingerprint",exist_normal_fingerprint)
- # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- final_list_docid = [a["docid"] for a in final_list]
- # log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
- _d = {"partitionkey":item["partitionkey"],
- "docid":item["docid"],
- "status":random.randint(*flow_dumplicate_status_to),
- document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- }
- dtmp = Document_tmp(_d)
- dup_docid = set()
- for _dict in final_list:
- if _dict.get("update_document","")!="true":
- dup_docid.add(_dict.get(document_tmp_docid))
- if item.get(document_tmp_docid) in dup_docid:
- dup_docid.remove(item.get(document_tmp_docid))
- remove_list = []
- _unnormal = False
- dmp_docid = ""
- _check_time = self.check_page_time(item)
- if (_check_time and not exist_normal_fingerprint and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
- dtmp.setValue(document_tmp_save,1,True)
- # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- else:
- if exist_normal_fingerprint:
- log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
- best_docid = -1
- dmp_docid = ""
- _unnormal = True
- if not _check_time:
- best_docid = -2
- dmp_docid = ""
- _unnormal = True
- dtmp.setValue(document_tmp_save,0,True)
- if best_docid in dup_docid:
- dup_docid.remove(best_docid)
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- else:
- dmp_docid = ",".join([str(a) for a in list(dup_docid)])
- for _dict in final_list:
- if _dict.get(document_tmp_docid) in dup_docid:
- remove_list.append(_dict)
- list_docids = list(dup_docid)
- # if item.get(document_update_document)=="true":
- # dtmp.setValue(document_tmp_save,1,True)
- list_merge_dump = []
- if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
- if exist_finterprint:
- log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
- dtmp.setValue(document_tmp_projects,"[]",True)
- else:
- project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids,dtmp.getProperties().get(document_tmp_save),b_log=b_log)
- if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid) or best_docid<0):
- best_docid = merge_best_docid
- if list_merge_dump is not None and len(list_merge_dump)>0 and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
- dtmp.setValue(document_tmp_save,0,True)
- if list_merge_dump is not None:
- dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
- dtmp.setValue(document_tmp_projects,project_json,True)
- log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
- dmp_docid = set([a for a in dmp_docid.split(",") if a!=""])
- if str(best_docid) in dmp_docid:
- dmp_docid.remove(str(best_docid))
- dmp_docid = ",".join([str(a) for a in list(dmp_docid)])
- if _unnormal:
- dmp_docid = ""
- if upgrade:
- # print(dtmp.getProperties())
- dmp_docid = dmp_docid.replace(",,",",")
- dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
- dtmp.setValue(document_tmp_best_docid,best_docid,True)
- _flag = dtmp.update_row(self.ots_client)
- if not _flag:
- for i in range(10):
- list_proj_json = dtmp.getProperties().get(document_tmp_projects)
- if list_proj_json is not None:
- list_proj = json.loads(list_proj_json)
- dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
- if dtmp.update_row(self.ots_client):
- break
- self.changeSaveStatus(remove_list)
- self.changeSaveStatus(list_merge_dump)
- else:
- return list_docids
- except Exception as e:
- traceback.print_exc()
- log("dumplicate error on:%s"%(str(item.get(document_tmp_docid))))
- finally:
- log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
- self.queue_dumplicate_processed.put(item.get(document_tmp_docid))
- def fix_doc_which_not_in_project(self):
- '''
- 将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
- :return:
- '''
- def fix_doc_handle(item,result_queue):
- _docid = item.get(document_tmp_docid)
- b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(b_q,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count==0:
- log("fix_doc:%s not in project2"%(str(_docid)))
- d_tmp = Document_tmp(item)
- d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
- d_tmp.update_row(self.ots_client)
- current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-20)
- after_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
- if self.fix_doc_docid is None:
- bool_query = BoolQuery(must_queries=[
- TermQuery(document_tmp_save,1),
- RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
- RangeQuery(document_tmp_docchannel,0,300),
- RangeQuery(document_tmp_opertime,before_date,after_date)
- ])
- else:
- bool_query = BoolQuery(must_queries=[
- TermQuery(document_tmp_save,1),
- RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
- RangeQuery(document_tmp_docchannel,0,300),
- RangeQuery(document_tmp_docid,self.fix_doc_docid),
- RangeQuery(document_tmp_opertime,before_date,after_date)
- ])
- list_data = []
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_d = getRow_ots(rows)
- list_data.extend(list_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_d = getRow_ots(rows)
- list_data.extend(list_d)
- print("%d/%d"%(len(list_data),total_count))
- if len(list_data)>0:
- self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
- log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
- mt.run()
- def send_daily_check_data(self):
- import datetime
- def get_download_url(bucket, ObjectName, timeout):
- url = ""
- exist = bucket.object_exists(ObjectName)
- if exist:
- get_url = False
- for i in range(3):
- try:
- url = bucket.sign_url('GET', ObjectName, timeout)
- url = url.replace("-internal", "") # 替换地址里的内网标识
- get_url = True
- except:
- pass
- if get_url:
- break
- return url
- file_timeout = 60 * 60 * 24 * 5 # 文件下载链接保存 5 天
- # 获取昨天的日期
- date = str(datetime.date.today() - datetime.timedelta(days=1))
- oss_path = 'tmp_document_quality_data/'
- object_path = oss_path + date + '/'
- msg = "每日数据质量检查结果(报警):"
- csv_name = "数据质量监控检查结果.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket,ObjectName,file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "公告重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "公告附件重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "附件识别异常的站源.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
- ObjectName = object_path + csv_name
- url = get_download_url(self.bucket, ObjectName, file_timeout)
- if url:
- msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
- atMobiles = ['18813973429'] # 维阵
- ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
- sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
- def send_daily_check_data2(self):
- import datetime
- import pandas as pd
- from itertools import groupby
- dict_channel = {"公告变更": 51,
- "招标公告": 52,
- "中标信息": 101,
- "招标预告": 102,
- "招标答疑": 103,
- "资审结果": 105,
- "法律法规": 106,
- "新闻资讯": 107,
- "采购意向": 114,
- "拍卖出让": 115,
- "土地矿产": 116,
- "产权交易": 117,
- "废标公告": 118,
- "候选人公示": 119,
- "合同公告": 120}
- label2channel = {v:k for k,v in dict_channel.items()}
- def post_data(url,json_data):
- post_sucess = False
- for i in range(3):
- if not post_sucess:
- try:
- # 发送POST请求,传输JSON数据
- response = requests.post(url, json=json_data)
- # 检查响应状态码
- if response.status_code == 200:
- post_sucess = True
- except requests.exceptions.RequestException as e:
- log("send_daily_check_data2,post error reason: %s"%(str(e)))
- pass
- return post_sucess
- res_json = {
- "data": [],
- "count": 0
- }
- # 获取昨天的日期
- date = str(datetime.date.today() - datetime.timedelta(days=1))
- oss_path = 'tmp_document_quality_data/'
- object_path = oss_path + date + '/'
- csv_name = "数据质量监控检查结果.xlsx"
- ObjectName = object_path + csv_name
- LocalPath = os.path.join(self.current_path,"download",csv_name)
- down_res = downloadFile(self.bucket,ObjectName,LocalPath,retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- for web_source_no,original_docchannel,error_rule in zip(df['web_source_no'],df['original_docchannel'],df['error_rule']):
- error_rule = json.loads(error_rule)
- for error_type,error_sample in error_rule.items():
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": error_type,
- "ITEMS": error_sample
- }
- res_json['data'].append(tmp_data)
- res_json['count'] += 1
- os.remove(LocalPath)
- csv_name = "公告重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,fingerprint,original_docchannel,cnt,res in zip(df['web_source_no'], df['fingerprint'],
- df['original_docchannel'],df['cnt'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "编号公告重复",
- "FINGERPRINT": fingerprint,
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
- for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
- group = list(group)[:5]
- res_json['data'].extend(group)
- res_json['count'] += len(group)
- os.remove(LocalPath)
- csv_name = "公告附件重复量大的编号.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,filemd5,original_docchannel,cnt,res in zip(df['web_source_no'],df['filemd5'],
- df['original_docchannel'],df['cnt'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "编号附件重复",
- "FILEMD5": filemd5,
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
- for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
- group = list(group)[:5]
- res_json['data'].extend(group)
- res_json['count'] += len(group)
- os.remove(LocalPath)
- csv_name = "附件识别异常的站源.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- for web_source_no,original_docchannel,error_ratio,error_sample,res in zip(df['web_source_no'], df['original_docchannel'],
- df['error_ratio'],df['error_sample'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "附件识别异常",
- "ITEMS": json.loads(res)
- }
- res_json['data'].append(tmp_data)
- res_json['count'] += 1
- os.remove(LocalPath)
- csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
- ObjectName = object_path + csv_name
- down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
- if down_res:
- df = pd.read_excel(LocalPath)
- tmp_list = []
- for web_source_no,original_docchannel,res in zip(df['web_source_no'],df['original_docchannel'],df['res']):
- tmp_data = {
- "WEB_SOURCE_NO": web_source_no,
- "WEBTYPE": label2channel.get(original_docchannel, ""),
- "TYPE": "截止日期在发布日期之前",
- "ITEMS": json.loads(res)
- }
- tmp_list.append(tmp_data)
- res_json['data'].extend(tmp_list)
- res_json['count'] += len(tmp_list)
- os.remove(LocalPath)
- # url = "http://120.132.118.205:17090/saveQualityListData"
- url = "http://data-monitor.bidizhaobiao.com/oldApi/saveQualityListData"
- res = post_data(url,res_json)
- if res:
- log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
- def start_flow_dumplicate(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
- schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
- schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
- schedule.add_job(self.flow_remove,"cron",hour="20")
- schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
- schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
- schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
- schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
- schedule.start()
- def changeSaveStatus(self,list_dict):
- if list_dict is not None:
- for _dict in list_dict:
- if isinstance(_dict,dict):
- if _dict.get(document_tmp_save,1)==1:
- _d = {"partitionkey":_dict["partitionkey"],
- "docid":_dict["docid"],
- document_tmp_save:0
- }
- _d_tmp = Document_tmp(_d)
- if _d_tmp.exists_row(self.ots_client):
- _d_tmp.update_row(self.ots_client)
- elif isinstance(_dict,int):
- _d = {"partitionkey":_dict%500+1,
- "docid":_dict,
- document_tmp_save:0
- }
- _d_tmp = Document_tmp(_d)
- if _d_tmp.fix_columns(self.ots_client,["status",document_update_document],True):
- if _d_tmp.getProperties().get("status")==1:
- if _d_tmp.getProperties().get(document_update_document,"")!="true":
- _d_tmp.setValue("status",0,True)
- _d_tmp.update_row(self.ots_client)
- def test_dumplicate(self,docid):
- # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]
- item = self.get_attrs_before_dump(docid,columns)
- if item:
- log("start dumplicate_comsumer_handle")
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=True)
- return
- def test_merge(self,list_docid_less,list_docid_greater):
- list_docs_less = self.search_docs(list_docid_less)
- list_projects_less = self.generate_projects_from_document(list_docs_less)
- list_docs_greater = self.search_docs(list_docid_greater)
- list_projects_greater = self.generate_projects_from_document(list_docs_greater)
- list_projects_less.extend(list_projects_greater)
- list_projects = dumplicate_projects(list_projects_less,b_log=True)
- project_json = to_project_json(list_projects)
- log("project_json:%s"%project_json)
- return project_json
- def getRemainDoc(self,docid):
- columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
- bool_query = BoolQuery(must_queries=[
- TermQuery("docid",docid)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- if len(list_dict)>0:
- item = list_dict[0]
- start_time = time.time()
- self.post_extract(item)
- base_list = []
- set_docid = set()
- list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
- list_rules.sort(key=lambda x:x["confidence"],reverse=True)
- _i = 0
- step = 5
- item["confidence"] = 999
- if item.get(document_tmp_docid) not in set_docid:
- base_list.append(item)
- set_docid.add(item.get(document_tmp_docid))
- while _i<len(list_rules):
- must_not_q = []
- if len(base_list)>0:
- must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
- _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
- must_not_queries=must_not_q)
- _rule = list_rules[_i]
- confidence = _rule["confidence"]
- singleNum_keys = _rule["singleNum_keys"]
- contain_keys = _rule["contain_keys"]
- multiNum_keys = _rule["multiNum_keys"]
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
- _i += step
- _time = time.time()
- log("%d start final check with length:%d"%(item["docid"],len(base_list)))
- final_list = self.dumplicate_fianl_check(base_list)
- log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
- best_docid = self.get_best_docid(final_list)
- return best_docid
- return None
- def compare_dumplicate_check():
- import pandas as pd
- df_dump = Dataflow_dumplicate(start_delete_listener=False)
- test_count = 1000
- # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
- bool_query = BoolQuery(must_queries=[
- RangeQuery("docid",400453395,400463395)
- ])
- rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=10,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- log("flow_dumplicate producer total_count:%d"%total_count)
- list_dict = getRow_ots(rows)
- while 1:
- if not next_token or len(list_dict)>=test_count:
- break
- rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_dict.extend(getRow_ots(rows))
- def _handle1(_item,result_queue):
- try:
- list_docid = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
- _item["before"] = list_docid
- except Exception as e:
- pass
- dump_result = {}
- for item in list_dict:
- dump_result[item["docid"]] = {}
- task_queue = Queue()
- list_item = []
- for item in list_dict:
- _item = {}
- _item.update(item)
- list_item.append(_item)
- task_queue.put(_item)
- mt = MultiThreadHandler(task_queue,_handle1,None,30)
- mt.run()
- for item in list_item:
- dump_result[item["docid"]]["before"] = item.get("before")
- df_dump.check_rule = 2
- def _handle2(_item,result_queue):
- try:
- list_docid1 = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
- _item["after"] = list_docid1
- except Exception as e:
- pass
- task_queue = Queue()
- list_item = []
- for item in list_dict:
- _item = {}
- _item.update(item)
- list_item.append(_item)
- task_queue.put(_item)
- mt = MultiThreadHandler(task_queue,_handle2,None,30)
- mt.run()
- for item in list_item:
- dump_result[item["docid"]]["after"] = item.get("after")
- df_data = {"docid":[],
- "before":[],
- "after":[],
- "before-after":[],
- "after-before":[]}
- for docid,_d in dump_result.items():
- df_data["docid"].append(docid)
- before = _d.get("before",[])
- after = _d.get("after",[])
- df_data["before"].append(str(before))
- df_data["after"].append(str(after))
- df_data["before-after"].append(str(set(before)-set(after)))
- df_data["after-before"].append(str(set(after)-set(before)))
- df = pd.DataFrame(df_data,columns=["docid","before","after","before-after","after-before"])
- df.to_excel("compare_dump.xlsx")
- def fix_merge_docid(docid):
- def get_uuid_docids(docid):
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- TermQuery("docids",docid)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
- ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_row = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_row.extend(getRow_ots(rows))
- return list_row
- def get_new_docid(list_docid1,list_docid2):
- return list(set(list_docid1)-set(list_docid2))
- def get_list_docid(list_row):
- list_docid = []
- for row in list_row:
- docids = row.get("docids",'')
- if docids:
- list_docid.extend([int(a) for a in docids.split(",")])
- return list(set(list_docid))
- def get_list_uuid(list_row):
- list_uuid = []
- for row in list_row:
- uuid = row.get("uuid",'')
- if uuid:
- list_uuid.append(uuid)
- return list(set(list_uuid))
- list_row = get_uuid_docids(docid)
- print(list_row)
- list_docid1 = get_list_docid(list_row)
- list_new_docid = get_new_docid(list_docid1,[docid])
- while 1:
- if len(list_new_docid)==0:
- break
- list_row2 = []
- for _docid in list_new_docid:
- list_row2.extend(get_uuid_docids(_docid))
- list_docid1 = get_list_docid(list_row)
- list_docid2 = get_list_docid(list_row2)
- list_new_docid = get_new_docid(list_docid1,list_docid2)
- list_row.extend(list_row2)
- list_uuid = get_list_uuid(list_row)
- list_docid = get_list_docid(list_row)
- print(list_uuid)
- print(list_docid)
- for _docid in list_docid:
- _d = Document({document_partitionkey:_docid%500+1,
- document_docid:_docid,
- document_status:1})
- if _d.exists_row(ots_client):
- _d.update_row(ots_client)
- for _uuid in list_uuid:
- _p = Project({project_uuid:_uuid,})
- _p.delete_row(ots_client)
- if __name__ == '__main__':
- a = time.time()
- # df = Dataflow()
- # df.flow_init()
- # df.flow_test()
- # df.test_merge()
- # df.start_flow_attachment()
- # df.start_flow_extract()
- # df.start_flow_dumplicate()
- # # df.start_flow_merge()
- # df.start_flow_remove()
- # download_attachment()
- # test_attachment_interface()
- df_dump = Dataflow_dumplicate(start_delete_listener=False)
- # df_dump.start_flow_dumplicate()
- df_dump.test_dumplicate(613075691
- )
- # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
- # compare_dumplicate_check()
- # df_dump.test_merge([391898061
- # ],[371551361,])
- # df_dump.flow_remove_project_tmp()
- # fix_merge_docid(595271944)
- print("takes",time.time()-a)
- # df_dump.fix_doc_which_not_in_project()
- # df_dump.delete_projects_by_document(16288036)
- # log("=======")
- # for i in range(3):
- # time.sleep(20)
- #
- # a = {"docid":74295123}
- # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)
|