|
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.dataSource.source import *
- from BaseDataMaintenance.common.Utils import *
- from queue import Queue
- from tablestore import *
- from BaseDataMaintenance.model.ots.Preproject import *
- from uuid import uuid4
- import csv
- log_purchaseIntentionProcess_opertime_path = "/home/appuser/python/purchaseIntention_process_opertime.log"
- # 医疗产品 关键词
- def get_medical_product_list():
- # path = "work/medical_product_keyword.csv"
- path = os.path.dirname(__file__) + '/medical_product_keyword.csv'
- with open(path, 'r',encoding='utf-8') as f:
- reader = csv.reader(f)
- row_list = []
- for r in reader:
- row_list.append(r)
- row_list = row_list[1:]
- product_word_list = []
- for row in row_list:
- for word in row:
- if word:
- word = re.sub("\s|\|$","",word)
- product_word_list.append(word)
- re_medical_product = "|".join(list(set(product_word_list)))
- re_medical_product = re_medical_product.replace("(","\(")
- re_medical_product = re_medical_product.replace(")","\)")
- return re_medical_product
- # 行业分类 关键词
- def get_product_classifier_dict():
- # path = "work/preproject_product_keyword.csv"
- path = os.path.dirname(__file__) + '/preproject_product_keyword.csv'
- with open(path, 'r',encoding='utf-8') as f:
- reader = csv.reader(f)
- row_list = []
- for r in reader:
- row_list.append(r)
- row_list = row_list[1:]
- product_classifier_dict = dict()
- for row in row_list:
- keywords = row[4]
- _class = row[6]
- if _class:
- if _class not in product_classifier_dict:
- product_classifier_dict[_class] = []
- product_classifier_dict[_class].extend(keywords.split(","))
- else:
- product_classifier_dict[_class].extend(keywords.split(","))
- for key,value in product_classifier_dict.items():
- value = [word for word in value if word]
- value = list(set(value))
- product_classifier_dict[key] = "|".join(value)
- return product_classifier_dict
- # 服务类产品行业分类
- service_key_word = {
- "物业管理":"物业化?管理|物业服务|物管服务|物业综合|物业托管|物业维修|物业委托|物业$|宿舍管理|公寓管理|物业费|物业保洁|物业外包|物业项目|物业采购|物业业务|物业合同|后勤管理|后勤综合|后勤服务|后勤保障|后勤社会化",
- "保安服务":"保安服务|保安人员|保安管理|保安$|安保服务|安全保卫|秩序维护|保安劳务|(采购|雇佣|选聘|招聘|招募|聘用|聘请)保安|保安(费|经费|工资|支出|外包|保洁费|管护费|人员经费)|保安(项目|合同|业务|采购|比选)|保安人员工程|保安公司(比选|入围|招聘)|保安大厦|保安工作|保安派遣|安保劳务派遣|驻.{1,2}保安|保安工程|保安人员采购|(?:雇佣|选聘|招聘|招募|聘用|聘请)[^。;;司]+保安|保安人员项目|安保人员|治安保卫|安保管理|安保外包|车辆管理|门卫服务|安保",
- "保洁服务":"保洁服务|清洁服务|清洗服务|清扫服务|清洁卫生|保洁外派|保洁项目|保洁费|聘请专业保洁|保洁劳务|垃圾清运|环卫保洁|清洗保洁|保洁对外承包|清扫保洁(?!大队)|保洁员?工资|道路保洁|保洁劳务外包|保洁外包|保洁工程|保洁采购|区域养护保洁|扬尘治理|河道保洁|保洁公司项目|后勤保洁|垃圾收运|卫生保洁|垃圾清运人员劳务|保洁合同|保洁专业委托|保洁及维修人员|[房楼室]保洁|日常保洁|保洁招标|弃物清运|环境保洁|[公道]路养护|保洁业务(外包)?|[室户]内保洁|室外保洁|公厕保洁|乡道保洁|校园保洁|保洁管理费?|保洁发包|河道管护|人工保洁|机械保洁|公园保洁|常规保洁|保洁消杀|区域保洁|保洁工作承包|社区保洁|绿地保洁|绿化保洁|保洁管护|保洁政府采购|精细化保洁|保洁承包|保洁养护|保洁社会化|保洁市场化|保洁作业|保洁人员采购|养护保洁|保洁专项委托|保洁工资|聘用保洁人员|便道保洁|消杀|消毒杀菌|生物防治|除四害|助洁服务",
- "绿化养护":"绿化养护|绿化服务|绿植服务|绿植养护|绿化合同|道路绿化|绿地养护|绿化维护|绿化生态修复|绿化养管维护|绿化面积养护|绿化提升|绿化管护|绿化管养|绿化施工|绿化品质提升|绿化管理|绿化及养护|绿化景观|绿化日常维护|绿化专项养护|绿化保洁一体化|绿化人工劳务|园林绿化$|绿化采购|绿化(?:.{1,2}期)?项目|绿化工程|绿化管养|绿化建设|绿化防寒|绿化完善|绿化工?外包|景观绿化|绿化美化|绿化订购|绿化劳务|绿化应急|绿化补植|绿化面积管护|绿化管理日常维护|绿化人员劳务|绿化环境|绿化种植|绿化植树|绿化日常养护|绿化植物养护|绿化搬迁|绿[化地]社会化(承包|养护|管理)|绿化栽植|绿化整治|绿化等养护|绿地管护|绿化提质|绿化工作|绿[化地]市场化(承包|养护|管理)|绿化修补|绿化迁移|绿化年度养护|绿化改造|绿化补栽|绿化临时用工|绿化$|绿化养管|绿化维保|绿化修剪|造林",
- "运营维护":"运营维护|运维服务|运营项目|驻场运维服务|运营管理|托管运营|合作运营|运营合作|运营承包|运营投资|运营合作管理|委托运营|整体运营|第三方运营|运营及管理",
- "食堂承包":"食堂承包|食堂餐饮服务|餐饮外包|餐厅外包|餐饮服务|餐厅服务|食堂服务|餐饮管理服务|食堂服务外包|食堂经营服务|食堂经营权|食堂委托管理|食堂[业服]务外包|食堂主副食供|食堂劳务外包|食堂托管运营|食堂劳务承包|食堂劳务用工|食堂供餐|食材定点供应|餐厅遴选|食堂经营合作|引进社会餐饮|经营食堂|食堂厨师团队|食堂及超市对外承包|食堂托管劳务|食堂餐饮准入资格|食堂配送资格|食堂后厨外包|餐厅经营|食堂对外承包|食堂超市招租|食堂厨师外包|食堂托管经营|食堂劳务(政府)?采购|食堂(及生活超市)?经营权|餐厅承包经营|食堂引入餐饮公司|食堂(和餐厅)?公开招标|食堂委托|食堂及超市经营权|食堂管理运营|食堂.{0,2}承租|食堂合作经营|食堂委托经营|食堂(经营区域)?商户遴选|食堂目标责任制管理经营|食堂整体承包|食堂外包|食堂经营资质|食堂管理采购|食堂经营管理|食堂招标项目|食堂托管费|食堂对外出租经营|食堂委托运营|食堂招租|食堂经营项目|食堂投资经营权|食堂劳务|食堂运营|食堂托管|食堂经营承包|食堂管理费|餐厅经营权|餐饮技术合作|食堂采购$|食堂工程|食堂合同|餐厅委托经营|食堂社会化管理|食堂运行|食堂采购项目|食堂餐饮管理|食堂管理|餐厅经营管理|食堂、商店运营招租|食堂厨务管理|食堂员工劳务|食堂外委经营|食堂餐饮采购|食堂管理社会化|食堂供应商招标|食堂代理|食堂招标采购|食堂运营承包|食堂后厨承包|食堂费|食堂.{0,3}经营权|食堂人员外包|食堂聘用第三方外包|食堂.{0,2}经营者遴选|食堂整体委托运营|食堂承作|食堂招聘餐饮公司|食堂经营承包权|食堂、小卖部对外招租|配餐服务",
- "食材配送":"食材等?配送|食材(政府)?采购(及?配送)?|食堂物资配送|餐饮配送|食堂配送|食[材品]供[货应给]|食材招标|食材定点采购|食材定点配送|食品配送|食材(批量)?集中配送|食材(批量)?集中采购|食材集采|食材集配|食材采配|食材项目|食材[^,。;;]{0,25}配送|食材中标采购|食材[^,。;;]{0,10}供[应货]商|食材原料配送|食材统一?配|食材需求采购|食材批量采购|食材批量配送|食材外包配送|配送食材费|食材框架|食材定点供[应货]|食材年度(定点)?采购|食材合同|食材招投标|副食配送|食材代采购|食材(食品)?[^,。;;]{0,10}(采购|配送)|食材综合配送|食材年度供[应货]商|食材集成供[应货]商|配送食材|采购食材|食材服务商|食材类供[应货]商|食材工程|[主副]食原?材料(招标|采购|供应|供货)|食堂(大宗)?物资|食堂原材?料(采购|购买|配送)|副食|食品批发|饮料批发|食堂食材",
- "环卫服务":"环卫服务|环境卫生服务|环卫保洁|环卫一体化|环卫市场化|环卫.{0,1}绿化一体化|环卫作业|环卫清扫|道路清扫|清雪服务|垃圾清运|环卫外包|环卫日常养护|环卫整体运营|环卫护路一体化|环卫(综合)?提升|环卫统筹|环卫项目|环卫管理承包|环卫采购|环卫工作市场化|环卫管理劳务|环境卫生作业|环卫养护|道路环卫|环卫、垃圾分类|环卫人员劳务|环卫园林一体化|环卫清运|环卫垃圾清运|劳务发包|劳务业务外包|(?:雇佣|选聘|招聘|招募|聘用|聘请)劳务人员",
- "劳务派遣":"劳务派遣|人力劳务[分外]包|劳务服务|劳务外包|劳务费|劳务承包|劳务用工|劳务合作|劳务采购|外包劳务机构|劳务承揽|劳务人员派遣|雇佣劳务公司|劳务集中采购|劳务入库|劳务市场化|劳务管理|劳务专业外包|劳务项目|劳务招标|劳务招聘|劳务协作|劳务社会化|外委劳务|劳务承作",
- "布草洗涤":"洗涤服务|布草类?洗涤|织物洗涤|工[装衣服]洗涤|工作服洗涤|棉织品洗涤|洗涤采购|洗涤项目|被服洗涤|制服洗涤|布类洗涤|衣物洗涤|棉制品洗涤|洗涤费|洗涤(操作)?业务外包|洗涤外包|洗涤合同|洗涤全?承包|医疗洗涤|洗涤业务|租赁洗涤一体化|洗涤消毒|卧具洗涤|洗涤劳务外包|布草类?清洗|洗衣服务|(服装|被服|制服|布草)清洗"
- }
- product_key_word = {
- "服装":"服装|被[服装]|服饰|工作?[装衣服]|棉服|校服|制服|职业装|被褥|被单|被套|床单|床罩|针织品|棉织品|纺织品|校服|军训服|西[服装]|织物|床上用品|文化衫|棉被|大衣|病[人号]?服|棉衣|执勤服|登山服|警服|窗帘|院服|T恤|[男女东夏]装|床垫|劳保用品|比赛服|防护服|衬衫|春秋常服|防寒服|防寒装备|消防服|团服|运动服|包布|棉麻制品|训练服|囚服|球服",
- "电器":"电器|家电|家用电器|空调|冰箱|热水器|净水器|风扇|电视|冰柜|冷[藏冻]箱|[抽除]湿机|洗碗机|饮水机|显示器|扩音设备|音响|洗衣机|电饭煲|电磁炉|烤箱|取暖器|干衣机|吸尘器",
- "办公用品":"办公室?用品|办公室?设备|办公耗材|办公消耗品|打印[机纸]|复印[机纸]|办公文具|电脑|计算机|显示[屏器]|多功能一体机|硒鼓|扫描仪|投影仪|碎纸机|办公用纸|[aA][34]纸|教学设备|触控一体机|速印机|墨粉|档案用品|文件夹|档案盒|档案袋|文件袋|桌面用品|打孔机|笔筒|胶带|尺子|账本|单据|发票|收据|印章|墨盒|色带|粉盒|公文柜|剪刀|胶条|胶棒|橡皮擦|回形针|直尺|美工刀|传真机|笔记本|[Ll][Ee][Dd](显示)?[屏器]|台式机|色鼓|会议设备|财务用品|丝印机|彩机纸|彩色卡纸|装订机|过塑机|电话机|摄像机|路由器|办公消耗用?品|办公纸|订书机|胶水|签字笔|计算器|粉笔|票夹|长尾夹|鼠标|键盘|硬盘|办公设施|U盘|电源|打印复印一体机|文件盒|起钉器|多媒体设备|打印机?耗材|中性笔|台式整机|印刷耗材",
- "家具":"家具|办公家具|课[桌椅]|柜类|密集架|桌椅|办公椅|文件柜|沙发|公寓床|保密柜|书桌|椅子|实木家具|书柜|组合床|挂镜柜|办公桌|会议桌|木床|储[物藏]柜|宿舍床|保险柜|座椅|餐桌|午休床|书包柜|床头柜|学生床|实验凳|整理柜|防潮柜|铁架床|橱柜|碗橱|吸油烟机|会议椅|椅凳|茶水柜|柜$|书架",
- "印刷服务":"印务|印刷服务|印刷|彩印|印刷品|印刷费|编印|印制|出版服务|宣传品|文印服务|打印服务",
- "法律服务":"法[律务](专项)?[服业事]务|法律顾问|法律咨询|法律援助|诉讼代理|代理诉讼|合同审查|知识产权保护|知识产权服务|法律文书|法律风险评估|合规咨询|调解.?仲裁|法律培训|法律信息服务|律师|案件代理|调解服务|司法辅助|诉讼(专项)?[服业]务|法律支持|法律追偿|司法鉴定|纠纷调解|法律文件代理|法律调查|法律尽职调查|仲裁代理|司法[服业事]务|案件(协助)?调查|行政诉讼|公证服务|法律公证",
- "环保服务":"环境服务|环境治理|环境整治|环境(质量)?[检监]测|[污尾]水[治处]理|综合治理|污染防治|废物处置|环境影响评[价估]|污染源监测|排污口排查|水污染治理|土地整治|生态监测|生态保护|环保咨询|环保工程|生态修复|新能源技术|环境影响评价|环境风险评估|废[水气]处理|噪声控制|废弃物资源化利用|再生资源回收利用|废弃资源综合利用|水土保持|土壤污染状况调查|废弃物回收处理",
- "照明服务":"照明服务|照明工程|照明亮化|亮化工程|路灯|灯具|灯饰|道路照明|夜景照明|景观照明|展示照明|广告牌?照明|家居照明|智能照明|办公照明|照明(方案|施工图)?设计|照明设备的?安装|景观灯|定制灯具|照明改造|建筑照明|桥梁照明|照明系统|灯光照明|照明(采购|项目)|路灯亮化|照明升级|照明设施维护|路灯建设|灯光改造|路灯安装|灯具安装|照明灯|护眼灯|路灯养护|路灯配套|照明设备|亮化建设|泛光照明|灯光工程|^[\u4e00-\u9fa5]{1,3}灯$|[Ll][Ee][Dd]灯",
- "广告服务":"广告|宣传|推广|媒体投放|营销策划|广告业",
- "财税服务":"财税服务|财务服务|税务服务|审计|会计|记账|税务代理|工商代理|资产评估|资产盘点|资产[\u4e00-\u9fa5]{1,2}清查|财务咨询|财务评估|财务顾问|竣工决算|财务决算|涉税评估|财务清理|结算审核",
- "金融服务":"保险|金融服务|车辆?险|责任险|财险|资金存放|[债证]券承销|融资券|^[\u4e00-\u9fa5]{1,3}险$|意外险|医疗险|伤害险|灾险|不动产险|农业险|综合险|寿险|财产险|分红险|存放银行|融资咨询|信用评[价级]",
- "车辆服务":"车辆?[))】\]]?(定点)?(服务|购买|询价|采购|租|维修|保养|维保|购置|更新|更换|运行维护|加油|保险|配件)|租车|修车|特种车|专用车|商用车|公务用车|执勤车|(购置|采购|购买|维修|保养|维保|租赁?|更新|租用|更换)[\u4e00-\u9fa5]{1,4}车|执法车|警用车",
- "商旅服务":"商旅服务|商旅管理|住宿和餐饮|会议服务|会务服务|酒店用品|商旅管理|团餐|住宿|[商差]旅咨询|商务考察|会议安排|会议旅游|展览旅游|(酒店|[\u4e00-\u9fa5]{1,2}票)[预代]订|[预代]订(酒店|[\u4e00-\u9fa5]{1,2}票)|商务考察",
- "消防服务":"消防行业|消防[\u4e00-\u9fa5]{0,4}(维保|维护|维修|保养|检测|安全评估|风险评估|评估|年度|宣传|设计)|(消防|灭火|防火)(设施|技术|安全|工程|器材|用品|物资|装备|装置|设备|系统|服务|排查|改造|项目|整改|整治)|消火栓|消防水?带|消防腰斧|消防泵|救援应急|应急救援|防汛|三防|抢险|火灾隐患|救灾装备|救灾物资|灭火器",
- "工程类项目":"工程管理|造价咨询|工程设计|施工图设计|工程咨询|工程规划|工程监理|施工总?承包|(升级|改造)(工程|项目)|工程款|施工管理|工程采购|施工进度(管理|监控)|建筑工程|装饰工程|市政工程|公路工程|桥梁工程|水利工程|能源工程|专业施工|修缮工程|工程服务|工程技术与设计|工程评价|工程造价|勘察|勘测|装修工程|工程(项目|总?承包)?$|EPC|工程建设|修缮提升|厂房项目|室建设|扩建|装修|农田建设|巩固提升工程|改造提升|总承包|工程建筑|建筑物?拆除|建筑装饰和装修|工程施工|小区改造|(楼|学|房|室|院|站)(工程|改造|修缮|扩建|建设|拆|提升|迁)"
- "|立项|项目投资|可行性研究|可研|环境评价|环境影响|环境评测|环评|(水保|环保|环境保护)(编制|验收|监测)|稳定风险|社会稳定|风险评估|(水影响|能源|交通影响|地质灾害|地址灾害|地震安全性|地震安全性|气象|雷击风险|安全|海洋|森林环境)(评[价估测])|水土保持|(水|交|灾|震|能|气|安|海|林)评|(决算书|预算|结算|造价|决算)(编制|咨询)|(施工图(纸|)|初步|项目|工程)(方案|)设计|测绘|勘查|(施工图(纸|)|防雷|消防|人防)审查|施工许可证|施工准备|监理|资格预审|资审|竣工|验收",
- "信息系统":"信息系统|智能化|软件|系统集成服务|网络安全|远程管理服务|应用开发|云计算|信息平台|智慧数字|系统开发|信息化建设|智慧课堂|信息技术|网络链路|电子化|大数据|数字化|云系统|管理系统|硬件|(系统|平台)(建设|维护|维保|运维)|人脸识别|智能分析|数据分析|虚拟化|软件开发|(智慧|智能|数字|智数|电子)[^,,。、;;]{0,8}(平台|管理|系统|项目|建设|开发)|编程技术|自动化|数据处理|元数据|网络服务|网络工程|信息管理|服务终端|云平台|数据支撑|图形开发|网络系统|监控[\u4e00-\u9fa5]{0,2}系统|导航系统|互联网接入|互联网平台|网络接入|服务平台|互联网安全",
- "医疗产品":"医疗产品|医疗器械|医疗耗材|疫情(防控)?物资",
- "土地流转":"土地流转|(地|田|塘|国土|水面|资产).{0,6}(经营权|使用权|租赁|流转|承包|出租|转包)"
- }
- # 项目标签 关键词
- def get_project_label_keywords():
- # import csv
- path = os.path.dirname(__file__) + '/project_label_keywords.csv'
- with open(path, 'r') as f:
- reader = csv.reader(f)
- key_word_list = []
- for r in reader:
- if r[0] == '类型':
- continue
- _type = r[0]
- key_wrod = r[1]
- key_paichuci = str(r[2])
- key_paichuci = key_paichuci if key_paichuci and key_paichuci != 'nan' else ""
- type_paichuci = str(r[3])
- type_paichuci = type_paichuci if type_paichuci and type_paichuci != 'nan' else ""
- key_word_list.append((_type, key_wrod, key_paichuci, type_paichuci))
- return key_word_list
- class PreprojectFill():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.task_queue = Queue(3000)
- self.fill_concat_queue = Queue(10000)
- self.purchaseIntention_process_queue = Queue(3000)
- self.re_medical_product = get_medical_product_list()
- self.product_classifier_dict = get_product_classifier_dict()
- self.project_label_keywords = get_project_label_keywords()
- log("PurchaseIntentionProcess load keywords files sucess")
- def fill_comsumer(self):
- def comsumer_handle(_row,result_queue):
- if _row.get(preproject_uuid) is None or _row.get("status") is None:
- _preproject = Preproject(_row)
- # 删除无uuid数据
- _preproject.delete_row(self.ots_client)
- return
- if _row.get(preproject_has_bidfile) is None:
- json_docids = _row.get(preproject_json_docids)
- if json_docids is not None:
- docids = json.loads(json_docids)
- list_docids = []
- for a in docids[:30]:
- for b in a.split(","):
- list_docids.append(b)
- atta_query = BoolQuery(should_queries=[TermQuery("docids",_d) for _d in list_docids[:100]])
- atta_b_q = BoolQuery(must_queries=[TermQuery("classification","招标文件"),atta_query])
- atta_rows,atta_next_token,atta_total_count,_ = self.ots_client.search("attachment","attachment_index",
- SearchQuery(atta_b_q,get_total_count=True,limit=1),
- ColumnsToGet(["docids"],ColumnReturnType.SPECIFIED))
- if atta_total_count>0:
- _row[preproject_has_bidfile] = 1
- else:
- _row[preproject_has_bidfile] = 0
- _preproject = Preproject(_row)
- _preproject.update_row(self.ots_client)
- _mul = MultiThreadHandler(self.task_queue,comsumer_handle,None,10)
- _mul.run()
- def fill_producer(self):
- # 存在uuid数据,补充'has_bidfile'字段
- q1 = BoolQuery(must_queries=[
- ExistsQuery("uuid"),
- BoolQuery(must_not_queries=[
- ExistsQuery("has_bidfile")
- ])
- ])
- # 无uuid数据,用于删除行数据
- # q2 = BoolQuery(must_not_queries=[
- # ExistsQuery("uuid")
- # ])
- q2 = BoolQuery(should_queries=
- [ BoolQuery(must_not_queries=[ExistsQuery("uuid")]),
- BoolQuery(must_not_queries=[ExistsQuery("status")])
- ])
- columns = ["uuid","has_bidfile","json_docids","status"]
- query = BoolQuery(should_queries=[q1,
- q2])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
- SearchQuery(query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- if len(rows) > 0:
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.task_queue.put(_row)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
- SearchQuery(query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- if len(rows)>0:
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.task_queue.put(_row)
- def fill_contact_producer(self):
- q1 = BoolQuery(must_queries=[
- TermQuery("status",1),
- ExistsQuery("uuid")
- ])
- columns = ["status",preproject_tenderee,preproject_last_tenderee_contact,preproject_last_tenderee_phone,preproject_last_win_tenderer,preproject_last_win_tenderer_contact,preproject_last_win_tenderer_phone]
- query = q1
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
- SearchQuery(query,sort=Sort(sorters=[FieldSort("crtime",SortOrder.DESC)]),get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.fill_concat_queue.put(_row)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
- SearchQuery(query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.fill_concat_queue.put(_row)
- def fill_contact_comsumer(self):
- def comsumer_handle(_row,result_queue):
- product = _row.get(preproject_product)
- tenderee = _row.get(preproject_tenderee)
- tenderee_concat = _row.get(preproject_last_tenderee_contact)
- tenderee_phone = _row.get(preproject_last_tenderee_phone)
- win_tenderer = _row.get(preproject_last_win_tenderer)
- win_tenderer_concat = _row.get(preproject_last_win_tenderer_contact)
- win_tenderer_phone = _row.get(preproject_last_win_tenderer_phone)
- if tenderee is not None and tenderee!="":
- # if (tenderee_concat is None or tenderee_concat=="") and (tenderee_phone is None or tenderee_phone==""):
- if tenderee_phone is None or tenderee_phone=="":
- #fill tenderee concat and phone
- bool_query = BoolQuery(must_queries=[
- TermQuery(preproject_tenderee,tenderee),
- BoolQuery(should_queries=[
- MatchPhraseQuery("doctextcon",product),
- MatchPhraseQuery("attachmenttextcon",product)
- ])
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
- ColumnsToGet(["tenderee_contact","tenderee_phone"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- _find = False
- for _data in list_data:
- tenderee_contact = _data.get("tenderee_contact")
- tenderee_phone = _data.get("tenderee_phone")
- # if (tenderee_contact is not None and tenderee_contact!="") or (tenderee_phone is not None and tenderee_phone!=""):
- if tenderee_phone is not None and tenderee_phone!="":
- _find = True
- _row[preproject_last_tenderee_contact] = tenderee_contact
- _row[preproject_last_tenderee_phone] = tenderee_phone
- break
- if not _find:
- bool_query = BoolQuery(must_queries=[
- TermQuery("status", 1),
- TermQuery("enterprise_name",tenderee)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=5),
- ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- # _data = list_data[0]
- for _data in list_data:
- tenderee_contact = _data.get("contact_person")
- tenderee_phone = _data.get("phone_no")
- if tenderee_phone is not None and tenderee_phone!="":
- _row[preproject_last_tenderee_contact] = tenderee_contact
- _row[preproject_last_tenderee_phone] = tenderee_phone
- break
- if win_tenderer is not None and win_tenderer!="":
- # if (win_tenderer_concat is None or win_tenderer_concat!="") and (win_tenderer_phone is None or win_tenderer_phone==""):
- if win_tenderer_phone is None or win_tenderer_phone=="":
- # fill win_tenderer concat and phone
- bool_query = BoolQuery(must_queries=[
- TermQuery("win_tenderer",win_tenderer),
- BoolQuery(should_queries=[
- MatchPhraseQuery("doctextcon",product),
- MatchPhraseQuery("attachmenttextcon",product)
- ])
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
- ColumnsToGet(["win_tenderer_manager","win_tenderer_phone"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- _find = False
- for _data in list_data:
- _contact = _data.get("win_tenderer_manager")
- _phone = _data.get("win_tenderer_phone")
- # if (_contact is not None and _contact!="") or (_phone is not None and _phone!=""):
- if _phone is not None and _phone!="" and _phone!=_row.get(preproject_last_tenderee_phone):
- _find = True
- _row[preproject_last_win_tenderer_contact] = _contact
- _row[preproject_last_win_tenderer_phone] = _phone
- break
- if not _find:
- bool_query = BoolQuery(must_queries=[
- TermQuery("status", 1),
- TermQuery("enterprise_name",win_tenderer)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=5),
- ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- # _data = list_data[0]
- for _data in list_data:
- _contact = _data.get("contact_person")
- _phone = _data.get("phone_no")
- # _phone不为空,且不等于招标人电话
- if _phone is not None and _phone!="" and _phone!=_row.get(preproject_last_tenderee_phone):
- _row[preproject_last_win_tenderer_contact] = _contact
- _row[preproject_last_win_tenderer_phone] = _phone
- break
- _row["status"] = 0
- _preproject = Preproject(_row)
- _preproject.setValue("status",2,True)
- _preproject.update_row(self.ots_client)
- _mul = MultiThreadHandler(self.fill_concat_queue,comsumer_handle,None,20)
- _mul.run()
- # 预测产品分类 project_name,demand,doctitle
- def get_product_classification(self, tenderee, product, project_name, last_project_name, demand, info_type):
- product = product.replace(tenderee, "")
- project_name = project_name if project_name else ""
- project_name = project_name.replace(tenderee, "")
- last_project_name = last_project_name if last_project_name else ""
- last_project_name = last_project_name.replace(tenderee, "")
- if not project_name:
- project_name = last_project_name
- demand = demand if demand else ""
- demand = demand.replace(tenderee, "")
- info_type = info_type if info_type else ""
- class_list = []
- if not class_list:
- for key, value in service_key_word.items():
- if re.search(value, product):
- class_list.append(key)
- for key, value in product_key_word.items():
- if re.search(value, product):
- class_list.append(key)
- for key, value in self.product_classifier_dict.items():
- if re.search(value, product):
- class_list.append(key)
- if "医疗产品" not in class_list and re.search(self.re_medical_product, product):
- class_list.append("医疗产品")
- # project_name
- if project_name and len(class_list) == 0:
- for key, value in service_key_word.items():
- if re.search(value, project_name):
- class_list.append(key)
- for key, value in product_key_word.items():
- if re.search(value, project_name):
- class_list.append(key)
- for key, value in self.product_classifier_dict.items():
- if re.search(value, product):
- class_list.append(key)
- if "医疗产品" not in class_list and re.search(self.re_medical_product, project_name):
- class_list.append("医疗产品")
- # info_type
- if info_type and len(class_list) == 0:
- for key, value in service_key_word.items():
- if re.search(value, info_type):
- class_list.append(key)
- for key, value in product_key_word.items():
- if re.search(value, info_type):
- class_list.append(key)
- for key, value in self.product_classifier_dict.items():
- if re.search(value, product):
- class_list.append(key)
- if "医疗产品" not in class_list and re.search(self.re_medical_product, info_type):
- class_list.append("医疗产品")
- # demand
- if demand and len(class_list) == 0:
- for key, value in service_key_word.items():
- if re.search(value, demand):
- class_list.append(key)
- for key, value in product_key_word.items():
- if re.search(value, demand):
- class_list.append(key)
- for key, value in self.product_classifier_dict.items():
- if re.search(value, product):
- class_list.append(key)
- if "医疗产品" not in class_list and re.search(self.re_medical_product, demand):
- class_list.append("医疗产品")
- class_list = list(set(class_list))
- class_list.sort(key=lambda x: x)
- class_list = class_list[:3]
- class_list = ",".join(class_list)
- return class_list
- # 项目标签
- def get_project_label(self, product, demand, tenderee, agency):
- product = product if product else ""
- demand = demand if demand else ""
- tenderee = re.sub("\s", "", tenderee) if tenderee else ""
- agency = re.sub("\s", "", agency) if agency else ""
- # main_text = ",".join(list(set([product,last_doctitle,demand,project_name,last_project_name])))
- main_text = ",".join(list(set([product, demand])))
- main_text = re.sub("\s", "", main_text)
- doctitle = product
- doctitle = re.sub("\s", "", doctitle)
- # 查询字段排除tenderee、agency
- if tenderee:
- doctitle = doctitle.replace(tenderee, "")
- main_text = main_text.replace(tenderee, "")
- if agency:
- doctitle = doctitle.replace(agency, "")
- main_text = main_text.replace(agency, "")
- doctitle_dict = dict()
- main_text_dict = dict()
- for item in self.project_label_keywords:
- _type = item[0]
- key_wrod = item[1]
- # 关键词排除词
- key_paichuci = item[2]
- key_paichuci_s = "|".join(key_paichuci.split('、'))
- # 类型排除词
- type_paichuci = item[3]
- if type_paichuci:
- paichuci_split = type_paichuci.split('、')
- if re.search("|".join(paichuci_split), main_text):
- continue
- if doctitle:
- if key_wrod in doctitle:
- if not key_paichuci_s or (key_paichuci_s and not re.search(key_paichuci_s, doctitle)):
- key_wrod_count1 = doctitle.count(key_wrod)
- if _type not in doctitle_dict:
- doctitle_dict[_type] = {'关键词': [], '排除词': type_paichuci}
- doctitle_dict[_type]['关键词'].append((key_wrod, key_wrod_count1))
- if main_text:
- if key_wrod in main_text:
- if not key_paichuci_s or (key_paichuci_s and not re.search(key_paichuci_s, main_text)):
- key_wrod_count2 = main_text.count(key_wrod)
- if _type not in main_text_dict:
- main_text_dict[_type] = {'关键词': [], '排除词': type_paichuci}
- main_text_dict[_type]['关键词'].append((key_wrod, key_wrod_count2))
- # 排序 doctitle
- for k, v in doctitle_dict.items():
- doctitle_dict[k]['关键词'].sort(key=lambda x: x[1], reverse=True)
- # 按匹配次数保留前10个标签
- if len(doctitle_dict) > 10:
- doctitle_labels = [(k, sum(w[1] for w in doctitle_dict[k]['关键词'])) for k in doctitle_dict]
- doctitle_labels.sort(key=lambda x: x[1], reverse=True)
- for item in doctitle_labels[10:]:
- doctitle_dict.pop(item[0])
- # main_text
- pop_list = []
- for k, v in main_text_dict.items():
- if sum([j[1] for j in main_text_dict[k]['关键词']]) == 1:
- pop_list.append(k)
- main_text_dict[k]['关键词'].sort(key=lambda x: x[1], reverse=True)
- # if len(pop_list)<len(main_text_dict):
- # for k in pop_list:
- # main_text_dict.pop(k)
- # 按匹配次数保留前10个标签
- if len(main_text_dict) > 10:
- main_text_labels = [(k, sum(w[1] for w in main_text_dict[k]['关键词'])) for k in main_text_dict]
- main_text_labels.sort(key=lambda x: x[1], reverse=True)
- for item in main_text_labels[10:]:
- main_text_dict.pop(item[0])
- doctitle_dict = [i for i in doctitle_dict.keys()]
- doctitle_dict.sort(key=lambda x: x)
- doctitle_dict = ','.join(doctitle_dict) if doctitle_dict else None
- main_text_dict = [i for i in main_text_dict.keys()]
- main_text_dict.sort(key=lambda x: x)
- main_text_dict = ','.join(main_text_dict) if main_text_dict else None
- return doctitle_dict, main_text_dict
- def purchaseIntention_process_comsumer(self):
- def comsumer_handle(_row,result_queue):
- docid = _row.get('docid')
- tenderee = _row.get('tenderee')
- demand_info = _row.get('demand_info')
- project_name = _row.get('project_name')
- if demand_info is not None and 'data":[]' not in demand_info:
- result = process_purchaseIntention(docid, tenderee, demand_info, project_name)
- for item in result:
- _tenderee, final_product, product_list, order_begin, order_end, demand, _project_name, budget, json_docids = item
- if order_begin and _tenderee is not None and len(_tenderee)>3 and final_product is not None and len(final_product)>1:
- province = _row.get('province')
- city = _row.get('city')
- district = _row.get('district')
- doctitle = _row.get('doctitle')
- tenderee_contact = _row.get('tenderee_contact')
- tenderee_phone = _row.get('tenderee_phone')
- info_type = _row.get('info_type')
- result_row = dict()
- result_row['tenderee'] = _tenderee
- result_row['product'] = final_product
- result_row['may_begin'] = order_begin
- result_row['may_end'] = order_end
- result_row['crtime'] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
- result_row['type'] = 1
- result_row['demand'] = demand
- result_row['project_name'] = _project_name
- result_row['bidding_budget'] = budget
- result_row['prob'] = 1.0
- result_row['json_docids'] = json_docids
- result_row['province'] = province
- result_row['city'] = city
- result_row['district'] = district
- result_row['in_doctextcon'] = 0
- result_row['last_doctitle'] = doctitle
- result_row['last_tenderee_contact'] = tenderee_contact
- result_row['last_tenderee_phone'] = tenderee_phone
- result_row['status'] = 1
- result_row['uuid'] = str(uuid4())
- result_row['product_classification'] = self.get_product_classification(_tenderee, final_product, _project_name, "", demand, info_type)
- doctitle_product_labels,core_field_product_labels = self.get_project_label(final_product, demand, _tenderee, "")
- result_row['doctitle_product_labels'] = doctitle_product_labels
- result_row['core_field_product_labels'] = core_field_product_labels
- # tenderee, product, may_begin, may_end, crtime, type, demand, project_name, bidding_budget, prob, json_docids, province, city, district,
- # in_doctextcon, last_doctitle, last_tenderee_contact, last_tenderee_phone
- _preproject = Preproject(result_row)
- if not _preproject.exists_row(self.ots_client):
- _preproject.update_row(self.ots_client)
- _mul = MultiThreadHandler(self.purchaseIntention_process_queue,comsumer_handle,None,20)
- _mul.run()
- def purchaseIntention_process_producer(self):
- columns = ['tenderee','demand_info','project_name','province', 'city', 'district','doctitle', 'tenderee_contact',
- 'tenderee_phone', 'info_type']
- end_page_time = time.strftime("%Y-%m-%d",time.localtime())
- start_page_time = timeAdd(end_page_time,days=-5)
- start_opertime = end_page_time + " 00:00:00" # start_opertime默认值为当天0点
- end_opertime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
- log_path = log_purchaseIntentionProcess_opertime_path
- all_log_msg = []
- with open(log_path, mode='r', encoding='utf-8') as f:
- all_log_msg = f.readlines()
- if all_log_msg:
- if all_log_msg[-1]=='\n':
- all_log_msg = all_log_msg[:-1]
- last_opertime = ""
- for _log in all_log_msg[::-1]:
- if "end opertime time:" in _log:
- try:
- last_opertime = re.search('(?:end opertime time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
- break
- except:
- continue
- if last_opertime:
- start_opertime = last_opertime
- log_msg = "PurchaseIntention_process start, start opertime:%s, end opertime time:%s " % (start_opertime,end_opertime)
- all_log_msg = all_log_msg[-2000:]
- all_log_msg.append(log_msg + '\n')
- if all_log_msg:
- with open(log_path, mode='w', encoding='utf-8') as f:
- f.writelines(all_log_msg)
- query = BoolQuery(must_queries=[
- TermQuery('save',1), # 去重数据
- TermQuery('docchannel',114), # 采购意向类型公告
- RangeQuery('page_time',range_from=start_page_time,range_to=end_page_time,include_upper=True),
- RangeQuery('opertime',range_from=start_opertime,range_to=end_opertime)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,sort=Sort(sorters=[FieldSort("page_time")]),get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- if len(rows) > 0:
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.purchaseIntention_process_queue.put(_row)
- log("purchaseIntention_process ots query total_count:%d"%total_count)
- while next_token:
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("document_tmp", "document_tmp_index",
- SearchQuery(query,next_token=next_token,
- get_total_count=True,limit=100),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- if len(rows) > 0:
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.purchaseIntention_process_queue.put(_row)
- def schedule(self):
- _scheduler = BlockingScheduler()
- _scheduler.add_job(self.fill_producer,"cron",minute="*/10")
- _scheduler.add_job(self.fill_comsumer,"cron",minute="*/10")
- _scheduler.add_job(self.fill_contact_producer,"cron",minute="*/1")
- _scheduler.add_job(self.fill_contact_comsumer,"cron",minute="*/1")
- _scheduler.add_job(self.purchaseIntention_process_producer, "cron", minute="*/30")
- _scheduler.add_job(self.purchaseIntention_process_comsumer, "cron", minute="*/30")
- _scheduler.start()
- def start_fill_preproject():
- preprojectFill = PreprojectFill()
- preprojectFill.schedule()
- # 采购意向数据处理
- def process_purchaseIntention(docid, tenderee, json_demand_info, doc_project_name):
- def getProduct(tenderee, _product, project_name, doc_project_name):
- if len(_product) > 0:
- _product.sort(key=lambda x: len(x), reverse=True)
- return _product[0]
- else:
- product = ""
- pj_name = str(project_name).replace(tenderee, "")
- pj_name = re.sub(".*公司|项目|采购", "", pj_name)
- if len(pj_name) < 25:
- product = pj_name
- if not product:
- if doc_project_name:
- doc_project_name = str(doc_project_name).replace(tenderee, "")
- doc_project_name = re.sub(".*公司|项目|采购", "", doc_project_name)
- product = doc_project_name
- return product
- # 时间格式范围准确性
- def formatTime(_date):
- if _date is not None:
- _d = _date.split("-")
- if len(_d) == 3:
- if int(_d[0]) > 2010 and int(_d[0]) < 2050 and int(_d[1]) > 0 and int(_d[1]) <= 12 and int(
- _d[2]) > 0 and int(_d[2]) <= 31:
- return "%s-%s-%s" % (_d[0].rjust(4, "2"), _d[1].rjust(2, "0"), _d[2].rjust(2, "0"))
- # 时间范围规范性
- def get_begin_end_time(order_begin, order_end):
- try:
- order_begin = time.mktime(time.strptime(order_begin, "%Y-%m-%d"))
- order_end = time.mktime(time.strptime(order_end, "%Y-%m-%d"))
- mid_time = (order_end - order_begin) / (24 * 60 * 60)
- if mid_time >= 0 and mid_time < 125:
- return True
- else:
- return False
- except:
- return False
- def process(docid, tenderee, json_demand_info, doc_project_name):
- if json_demand_info is None:
- return
- try:
- demand_info = json.loads(json_demand_info)
- except Exception as e:
- return
- result_list = []
- for _line in demand_info["data"]:
- try:
- _product = _line.get("product", [])
- _product.sort(key=lambda x: x)
- product_list = json.dumps(_product, ensure_ascii=False)
- order_end = _line.get("order_end")
- order_end = formatTime(order_end)
- project_name = _line.get("project_name", "")
- project_name = re.sub("\s", "", project_name)
- demand = _line.get("demand")
- budget = _line.get("budget")
- _tenderee = _line.get("tenderee", "")
- _tenderee = re.sub("\s", "", _tenderee)
- if _tenderee:
- # 优先选择表格中对应的采购单位而不是全文提取到的
- tenderee = _tenderee
- if budget:
- budget = float(budget)
- else:
- budget = 0.0
- order_begin = _line.get("order_begin")
- order_begin = formatTime(order_begin)
- if order_begin and order_end:
- if not get_begin_end_time(order_begin, order_end):
- continue
- if not order_begin or not order_end:
- continue
- product = getProduct(tenderee, _product, project_name, doc_project_name)
- product = re.sub("\s", "", product)
- json_docids = json.dumps([str(docid)], ensure_ascii=False)
- final_product = ""
- if project_name:
- # 优先使用 project_name作为产品
- final_product = project_name
- else:
- final_product = product
- if final_product:
- result_list.append([tenderee, final_product, product_list, order_begin, order_end, demand, project_name,budget, json_docids])
- except Exception as e:
- # logging.info("============error:%s" % (str(e)))
- pass
- return result_list
- result = process(docid, tenderee, json_demand_info, doc_project_name)
- if not result:
- return []
- else:
- return result
- def delete_wrong_data():
- import pandas as pd
- list_data = []
- task_queue = Queue()
- ots_client = getConnect_ots()
- q1 = BoolQuery(must_queries=[
- WildcardQuery("may_begin","000*"),
- # RangeQuery("crtime",'2022-05-24')
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
- SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- dict_rows = getRow_ots(rows)
- list_data.extend(dict_rows)
- _count = len(dict_rows)
- print("total_count",total_count)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
- SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- dict_rows = getRow_ots(rows)
- list_data.extend(dict_rows)
- _count += len(dict_rows)
- print("%d/%d"%(_count,total_count))
- # if _count>10000:
- # break
- # df = pd.read_csv("fa45de36-6e47-4817-b2d6-e79ea8c154601.csv")
- # for tenderee,product,may_begin,may_end in zip(df["tenderee"],df["product"],df["may_begin"],df["may_end"]):
- # _data = {"tenderee":tenderee,
- # "product":product,
- # "may_begin":str(may_begin),
- # "may_end":str(may_end)}
- # list_data.append(_data)
- for _data in list_data:
- # may_begin = _data.get("may_begin")
- # may_end = _data.get("may_end")
- # if len(may_begin)!=10 or len(may_end)!=10:
- # task_queue.put(_data)
- task_queue.put(_data)
- def _handle(item,result_queue):
- _preproject = Preproject(item)
- _preproject.delete_row(ots_client)
- print(item)
- item["may_begin"] = item["may_begin"].replace("0001","2022")
- item["may_end"] = item["may_end"].replace("0001","2022")
- _preproject = Preproject(item)
- _preproject.update_row(ots_client)
- print(item)
- log("====%d"%task_queue.qsize())
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def fixProductName():
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[MatchPhraseQuery("product","酒水降尘")])
- rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- preproject1 = Preproject(_row)
- preproject1.delete_row(ots_client)
- _row["product"] = _row["product"].replace("酒水降尘","洒水降尘")
- preproject2 = Preproject(_row)
- preproject2.update_row(ots_client)
- print(dict_rows)
- if __name__=="__main__":
- # preprojectFill = PreprojectFill()
- # preprojectFill.schedule()
- # delete_wrong_data()
- # fixProductName()
- start_fill_preproject()
- pass
|