fillColumns.py 54 KB


  1. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import *
  4. from BaseDataMaintenance.common.Utils import *
  5. from queue import Queue
  6. from tablestore import *
  7. from BaseDataMaintenance.model.ots.Preproject import *
  8. from uuid import uuid4
  9. import csv
  10. log_purchaseIntentionProcess_opertime_path = "/home/appuser/python/purchaseIntention_process_opertime.log"
  11. # 医疗产品 关键词
  12. def get_medical_product_list():
  13. # path = "work/medical_product_keyword.csv"
  14. path = os.path.dirname(__file__) + '/medical_product_keyword.csv'
  15. with open(path, 'r',encoding='utf-8') as f:
  16. reader = csv.reader(f)
  17. row_list = []
  18. for r in reader:
  19. row_list.append(r)
  20. row_list = row_list[1:]
  21. product_word_list = []
  22. for row in row_list:
  23. for word in row:
  24. if word:
  25. word = re.sub("\s|\|$","",word)
  26. product_word_list.append(word)
  27. re_medical_product = "|".join(list(set(product_word_list)))
  28. re_medical_product = re_medical_product.replace("(","\(")
  29. re_medical_product = re_medical_product.replace(")","\)")
  30. return re_medical_product
  31. # 行业分类 关键词
  32. def get_product_classifier_dict():
  33. # path = "work/preproject_product_keyword.csv"
  34. path = os.path.dirname(__file__) + '/preproject_product_keyword.csv'
  35. with open(path, 'r',encoding='utf-8') as f:
  36. reader = csv.reader(f)
  37. row_list = []
  38. for r in reader:
  39. row_list.append(r)
  40. row_list = row_list[1:]
  41. product_classifier_dict = dict()
  42. for row in row_list:
  43. keywords = row[4]
  44. _class = row[6]
  45. if _class:
  46. if _class not in product_classifier_dict:
  47. product_classifier_dict[_class] = []
  48. product_classifier_dict[_class].extend(keywords.split(","))
  49. else:
  50. product_classifier_dict[_class].extend(keywords.split(","))
  51. for key,value in product_classifier_dict.items():
  52. value = [word for word in value if word]
  53. value = list(set(value))
  54. product_classifier_dict[key] = "|".join(value)
  55. return product_classifier_dict
  56. # 服务类产品行业分类
  57. service_key_word = {
  58. "物业管理":"物业化?管理|物业服务|物管服务|物业综合|物业托管|物业维修|物业委托|物业$|宿舍管理|公寓管理|物业费|物业保洁|物业外包|物业项目|物业采购|物业业务|物业合同|后勤管理|后勤综合|后勤服务|后勤保障|后勤社会化",
  59. "保安服务":"保安服务|保安人员|保安管理|保安$|安保服务|安全保卫|秩序维护|保安劳务|(采购|雇佣|选聘|招聘|招募|聘用|聘请)保安|保安(费|经费|工资|支出|外包|保洁费|管护费|人员经费)|保安(项目|合同|业务|采购|比选)|保安人员工程|保安公司(比选|入围|招聘)|保安大厦|保安工作|保安派遣|安保劳务派遣|驻.{1,2}保安|保安工程|保安人员采购|(?:雇佣|选聘|招聘|招募|聘用|聘请)[^。;;司]+保安|保安人员项目|安保人员|治安保卫|安保管理|安保外包|车辆管理|门卫服务|安保",
  60. "保洁服务":"保洁服务|清洁服务|清洗服务|清扫服务|清洁卫生|保洁外派|保洁项目|保洁费|聘请专业保洁|保洁劳务|垃圾清运|环卫保洁|清洗保洁|保洁对外承包|清扫保洁(?!大队)|保洁员?工资|道路保洁|保洁劳务外包|保洁外包|保洁工程|保洁采购|区域养护保洁|扬尘治理|河道保洁|保洁公司项目|后勤保洁|垃圾收运|卫生保洁|垃圾清运人员劳务|保洁合同|保洁专业委托|保洁及维修人员|[房楼室]保洁|日常保洁|保洁招标|弃物清运|环境保洁|[公道]路养护|保洁业务(外包)?|[室户]内保洁|室外保洁|公厕保洁|乡道保洁|校园保洁|保洁管理费?|保洁发包|河道管护|人工保洁|机械保洁|公园保洁|常规保洁|保洁消杀|区域保洁|保洁工作承包|社区保洁|绿地保洁|绿化保洁|保洁管护|保洁政府采购|精细化保洁|保洁承包|保洁养护|保洁社会化|保洁市场化|保洁作业|保洁人员采购|养护保洁|保洁专项委托|保洁工资|聘用保洁人员|便道保洁|消杀|消毒杀菌|生物防治|除四害|助洁服务",
  61. "绿化养护":"绿化养护|绿化服务|绿植服务|绿植养护|绿化合同|道路绿化|绿地养护|绿化维护|绿化生态修复|绿化养管维护|绿化面积养护|绿化提升|绿化管护|绿化管养|绿化施工|绿化品质提升|绿化管理|绿化及养护|绿化景观|绿化日常维护|绿化专项养护|绿化保洁一体化|绿化人工劳务|园林绿化$|绿化采购|绿化(?:.{1,2}期)?项目|绿化工程|绿化管养|绿化建设|绿化防寒|绿化完善|绿化工?外包|景观绿化|绿化美化|绿化订购|绿化劳务|绿化应急|绿化补植|绿化面积管护|绿化管理日常维护|绿化人员劳务|绿化环境|绿化种植|绿化植树|绿化日常养护|绿化植物养护|绿化搬迁|绿[化地]社会化(承包|养护|管理)|绿化栽植|绿化整治|绿化等养护|绿地管护|绿化提质|绿化工作|绿[化地]市场化(承包|养护|管理)|绿化修补|绿化迁移|绿化年度养护|绿化改造|绿化补栽|绿化临时用工|绿化$|绿化养管|绿化维保|绿化修剪|造林",
  62. "运营维护":"运营维护|运维服务|运营项目|驻场运维服务|运营管理|托管运营|合作运营|运营合作|运营承包|运营投资|运营合作管理|委托运营|整体运营|第三方运营|运营及管理",
  63. "食堂承包":"食堂承包|食堂餐饮服务|餐饮外包|餐厅外包|餐饮服务|餐厅服务|食堂服务|餐饮管理服务|食堂服务外包|食堂经营服务|食堂经营权|食堂委托管理|食堂[业服]务外包|食堂主副食供|食堂劳务外包|食堂托管运营|食堂劳务承包|食堂劳务用工|食堂供餐|食材定点供应|餐厅遴选|食堂经营合作|引进社会餐饮|经营食堂|食堂厨师团队|食堂及超市对外承包|食堂托管劳务|食堂餐饮准入资格|食堂配送资格|食堂后厨外包|餐厅经营|食堂对外承包|食堂超市招租|食堂厨师外包|食堂托管经营|食堂劳务(政府)?采购|食堂(及生活超市)?经营权|餐厅承包经营|食堂引入餐饮公司|食堂(和餐厅)?公开招标|食堂委托|食堂及超市经营权|食堂管理运营|食堂.{0,2}承租|食堂合作经营|食堂委托经营|食堂(经营区域)?商户遴选|食堂目标责任制管理经营|食堂整体承包|食堂外包|食堂经营资质|食堂管理采购|食堂经营管理|食堂招标项目|食堂托管费|食堂对外出租经营|食堂委托运营|食堂招租|食堂经营项目|食堂投资经营权|食堂劳务|食堂运营|食堂托管|食堂经营承包|食堂管理费|餐厅经营权|餐饮技术合作|食堂采购$|食堂工程|食堂合同|餐厅委托经营|食堂社会化管理|食堂运行|食堂采购项目|食堂餐饮管理|食堂管理|餐厅经营管理|食堂、商店运营招租|食堂厨务管理|食堂员工劳务|食堂外委经营|食堂餐饮采购|食堂管理社会化|食堂供应商招标|食堂代理|食堂招标采购|食堂运营承包|食堂后厨承包|食堂费|食堂.{0,3}经营权|食堂人员外包|食堂聘用第三方外包|食堂.{0,2}经营者遴选|食堂整体委托运营|食堂承作|食堂招聘餐饮公司|食堂经营承包权|食堂、小卖部对外招租|配餐服务",
  64. "食材配送":"食材等?配送|食材(政府)?采购(及?配送)?|食堂物资配送|餐饮配送|食堂配送|食[材品]供[货应给]|食材招标|食材定点采购|食材定点配送|食品配送|食材(批量)?集中配送|食材(批量)?集中采购|食材集采|食材集配|食材采配|食材项目|食材[^,。;;]{0,25}配送|食材中标采购|食材[^,。;;]{0,10}供[应货]商|食材原料配送|食材统一?配|食材需求采购|食材批量采购|食材批量配送|食材外包配送|配送食材费|食材框架|食材定点供[应货]|食材年度(定点)?采购|食材合同|食材招投标|副食配送|食材代采购|食材(食品)?[^,。;;]{0,10}(采购|配送)|食材综合配送|食材年度供[应货]商|食材集成供[应货]商|配送食材|采购食材|食材服务商|食材类供[应货]商|食材工程|[主副]食原?材料(招标|采购|供应|供货)|食堂(大宗)?物资|食堂原材?料(采购|购买|配送)|副食|食品批发|饮料批发|食堂食材",
  65. "环卫服务":"环卫服务|环境卫生服务|环卫保洁|环卫一体化|环卫市场化|环卫.{0,1}绿化一体化|环卫作业|环卫清扫|道路清扫|清雪服务|垃圾清运|环卫外包|环卫日常养护|环卫整体运营|环卫护路一体化|环卫(综合)?提升|环卫统筹|环卫项目|环卫管理承包|环卫采购|环卫工作市场化|环卫管理劳务|环境卫生作业|环卫养护|道路环卫|环卫、垃圾分类|环卫人员劳务|环卫园林一体化|环卫清运|环卫垃圾清运|劳务发包|劳务业务外包|(?:雇佣|选聘|招聘|招募|聘用|聘请)劳务人员",
  66. "劳务派遣":"劳务派遣|人力劳务[分外]包|劳务服务|劳务外包|劳务费|劳务承包|劳务用工|劳务合作|劳务采购|外包劳务机构|劳务承揽|劳务人员派遣|雇佣劳务公司|劳务集中采购|劳务入库|劳务市场化|劳务管理|劳务专业外包|劳务项目|劳务招标|劳务招聘|劳务协作|劳务社会化|外委劳务|劳务承作",
  67. "布草洗涤":"洗涤服务|布草类?洗涤|织物洗涤|工[装衣服]洗涤|工作服洗涤|棉织品洗涤|洗涤采购|洗涤项目|被服洗涤|制服洗涤|布类洗涤|衣物洗涤|棉制品洗涤|洗涤费|洗涤(操作)?业务外包|洗涤外包|洗涤合同|洗涤全?承包|医疗洗涤|洗涤业务|租赁洗涤一体化|洗涤消毒|卧具洗涤|洗涤劳务外包|布草类?清洗|洗衣服务|(服装|被服|制服|布草)清洗"
  68. }
  69. product_key_word = {
  70. "服装":"服装|被[服装]|服饰|工作?[装衣服]|棉服|校服|制服|职业装|被褥|被单|被套|床单|床罩|针织品|棉织品|纺织品|校服|军训服|西[服装]|织物|床上用品|文化衫|棉被|大衣|病[人号]?服|棉衣|执勤服|登山服|警服|窗帘|院服|T恤|[男女东夏]装|床垫|劳保用品|比赛服|防护服|衬衫|春秋常服|防寒服|防寒装备|消防服|团服|运动服|包布|棉麻制品|训练服|囚服|球服",
  71. "电器":"电器|家电|家用电器|空调|冰箱|热水器|净水器|风扇|电视|冰柜|冷[藏冻]箱|[抽除]湿机|洗碗机|饮水机|显示器|扩音设备|音响|洗衣机|电饭煲|电磁炉|烤箱|取暖器|干衣机|吸尘器",
  72. "办公用品":"办公室?用品|办公室?设备|办公耗材|办公消耗品|打印[机纸]|复印[机纸]|办公文具|电脑|计算机|显示[屏器]|多功能一体机|硒鼓|扫描仪|投影仪|碎纸机|办公用纸|[aA][34]纸|教学设备|触控一体机|速印机|墨粉|档案用品|文件夹|档案盒|档案袋|文件袋|桌面用品|打孔机|笔筒|胶带|尺子|账本|单据|发票|收据|印章|墨盒|色带|粉盒|公文柜|剪刀|胶条|胶棒|橡皮擦|回形针|直尺|美工刀|传真机|笔记本|[Ll][Ee][Dd](显示)?[屏器]|台式机|色鼓|会议设备|财务用品|丝印机|彩机纸|彩色卡纸|装订机|过塑机|电话机|摄像机|路由器|办公消耗用?品|办公纸|订书机|胶水|签字笔|计算器|粉笔|票夹|长尾夹|鼠标|键盘|硬盘|办公设施|U盘|电源|打印复印一体机|文件盒|起钉器|多媒体设备|打印机?耗材|中性笔|台式整机|印刷耗材",
  73. "家具":"家具|办公家具|课[桌椅]|柜类|密集架|桌椅|办公椅|文件柜|沙发|公寓床|保密柜|书桌|椅子|实木家具|书柜|组合床|挂镜柜|办公桌|会议桌|木床|储[物藏]柜|宿舍床|保险柜|座椅|餐桌|午休床|书包柜|床头柜|学生床|实验凳|整理柜|防潮柜|铁架床|橱柜|碗橱|吸油烟机|会议椅|椅凳|茶水柜|柜$|书架",
  74. "印刷服务":"印务|印刷服务|印刷|彩印|印刷品|印刷费|编印|印制|出版服务|宣传品|文印服务|打印服务",
  75. "法律服务":"法[律务](专项)?[服业事]务|法律顾问|法律咨询|法律援助|诉讼代理|代理诉讼|合同审查|知识产权保护|知识产权服务|法律文书|法律风险评估|合规咨询|调解.?仲裁|法律培训|法律信息服务|律师|案件代理|调解服务|司法辅助|诉讼(专项)?[服业]务|法律支持|法律追偿|司法鉴定|纠纷调解|法律文件代理|法律调查|法律尽职调查|仲裁代理|司法[服业事]务|案件(协助)?调查|行政诉讼|公证服务|法律公证",
  76. "环保服务":"环境服务|环境治理|环境整治|环境(质量)?[检监]测|[污尾]水[治处]理|综合治理|污染防治|废物处置|环境影响评[价估]|污染源监测|排污口排查|水污染治理|土地整治|生态监测|生态保护|环保咨询|环保工程|生态修复|新能源技术|环境影响评价|环境风险评估|废[水气]处理|噪声控制|废弃物资源化利用|再生资源回收利用|废弃资源综合利用|水土保持|土壤污染状况调查|废弃物回收处理",
  77. "照明服务":"照明服务|照明工程|照明亮化|亮化工程|路灯|灯具|灯饰|道路照明|夜景照明|景观照明|展示照明|广告牌?照明|家居照明|智能照明|办公照明|照明(方案|施工图)?设计|照明设备的?安装|景观灯|定制灯具|照明改造|建筑照明|桥梁照明|照明系统|灯光照明|照明(采购|项目)|路灯亮化|照明升级|照明设施维护|路灯建设|灯光改造|路灯安装|灯具安装|照明灯|护眼灯|路灯养护|路灯配套|照明设备|亮化建设|泛光照明|灯光工程|^[\u4e00-\u9fa5]{1,3}灯$|[Ll][Ee][Dd]灯",
  78. "广告服务":"广告|宣传|推广|媒体投放|营销策划|广告业",
  79. "财税服务":"财税服务|财务服务|税务服务|审计|会计|记账|税务代理|工商代理|资产评估|资产盘点|资产[\u4e00-\u9fa5]{1,2}清查|财务咨询|财务评估|财务顾问|竣工决算|财务决算|涉税评估|财务清理|结算审核",
  80. "金融服务":"保险|金融服务|车辆?险|责任险|财险|资金存放|[债证]券承销|融资券|^[\u4e00-\u9fa5]{1,3}险$|意外险|医疗险|伤害险|灾险|不动产险|农业险|综合险|寿险|财产险|分红险|存放银行|融资咨询|信用评[价级]",
  81. "车辆服务":"车辆?[))】\]]?(定点)?(服务|购买|询价|采购|租|维修|保养|维保|购置|更新|更换|运行维护|加油|保险|配件)|租车|修车|特种车|专用车|商用车|公务用车|执勤车|(购置|采购|购买|维修|保养|维保|租赁?|更新|租用|更换)[\u4e00-\u9fa5]{1,4}车|执法车|警用车",
  82. "商旅服务":"商旅服务|商旅管理|住宿和餐饮|会议服务|会务服务|酒店用品|商旅管理|团餐|住宿|[商差]旅咨询|商务考察|会议安排|会议旅游|展览旅游|(酒店|[\u4e00-\u9fa5]{1,2}票)[预代]订|[预代]订(酒店|[\u4e00-\u9fa5]{1,2}票)|商务考察",
  83. "消防服务":"消防行业|消防[\u4e00-\u9fa5]{0,4}(维保|维护|维修|保养|检测|安全评估|风险评估|评估|年度|宣传|设计)|(消防|灭火|防火)(设施|技术|安全|工程|器材|用品|物资|装备|装置|设备|系统|服务|排查|改造|项目|整改|整治)|消火栓|消防水?带|消防腰斧|消防泵|救援应急|应急救援|防汛|三防|抢险|火灾隐患|救灾装备|救灾物资|灭火器",
  84. "工程类项目":"工程管理|造价咨询|工程设计|施工图设计|工程咨询|工程规划|工程监理|施工总?承包|(升级|改造)(工程|项目)|工程款|施工管理|工程采购|施工进度(管理|监控)|建筑工程|装饰工程|市政工程|公路工程|桥梁工程|水利工程|能源工程|专业施工|修缮工程|工程服务|工程技术与设计|工程评价|工程造价|勘察|勘测|装修工程|工程(项目|总?承包)?$|EPC|工程建设|修缮提升|厂房项目|室建设|扩建|装修|农田建设|巩固提升工程|改造提升|总承包|工程建筑|建筑物?拆除|建筑装饰和装修|工程施工|小区改造|(楼|学|房|室|院|站)(工程|改造|修缮|扩建|建设|拆|提升|迁)"
  85. "|立项|项目投资|可行性研究|可研|环境评价|环境影响|环境评测|环评|(水保|环保|环境保护)(编制|验收|监测)|稳定风险|社会稳定|风险评估|(水影响|能源|交通影响|地质灾害|地址灾害|地震安全性|地震安全性|气象|雷击风险|安全|海洋|森林环境)(评[价估测])|水土保持|(水|交|灾|震|能|气|安|海|林)评|(决算书|预算|结算|造价|决算)(编制|咨询)|(施工图(纸|)|初步|项目|工程)(方案|)设计|测绘|勘查|(施工图(纸|)|防雷|消防|人防)审查|施工许可证|施工准备|监理|资格预审|资审|竣工|验收",
  86. "信息系统":"信息系统|智能化|软件|系统集成服务|网络安全|远程管理服务|应用开发|云计算|信息平台|智慧数字|系统开发|信息化建设|智慧课堂|信息技术|网络链路|电子化|大数据|数字化|云系统|管理系统|硬件|(系统|平台)(建设|维护|维保|运维)|人脸识别|智能分析|数据分析|虚拟化|软件开发|(智慧|智能|数字|智数|电子)[^,,。、;;]{0,8}(平台|管理|系统|项目|建设|开发)|编程技术|自动化|数据处理|元数据|网络服务|网络工程|信息管理|服务终端|云平台|数据支撑|图形开发|网络系统|监控[\u4e00-\u9fa5]{0,2}系统|导航系统|互联网接入|互联网平台|网络接入|服务平台|互联网安全",
  87. "医疗产品":"医疗产品|医疗器械|医疗耗材|疫情(防控)?物资",
  88. "土地流转":"土地流转|(地|田|塘|国土|水面|资产).{0,6}(经营权|使用权|租赁|流转|承包|出租|转包)"
  89. }
  90. # 项目标签 关键词
  91. def get_project_label_keywords():
  92. # import csv
  93. path = os.path.dirname(__file__) + '/project_label_keywords.csv'
  94. with open(path, 'r') as f:
  95. reader = csv.reader(f)
  96. key_word_list = []
  97. for r in reader:
  98. if r[0] == '类型':
  99. continue
  100. _type = r[0]
  101. key_wrod = r[1]
  102. key_paichuci = str(r[2])
  103. key_paichuci = key_paichuci if key_paichuci and key_paichuci != 'nan' else ""
  104. type_paichuci = str(r[3])
  105. type_paichuci = type_paichuci if type_paichuci and type_paichuci != 'nan' else ""
  106. key_word_list.append((_type, key_wrod, key_paichuci, type_paichuci))
  107. return key_word_list
  108. class PreprojectFill():
  109. def __init__(self):
  110. self.ots_client = getConnect_ots()
  111. self.task_queue = Queue(3000)
  112. self.fill_concat_queue = Queue(10000)
  113. self.purchaseIntention_process_queue = Queue(3000)
  114. self.re_medical_product = get_medical_product_list()
  115. self.product_classifier_dict = get_product_classifier_dict()
  116. self.project_label_keywords = get_project_label_keywords()
  117. log("PurchaseIntentionProcess load keywords files sucess")
  118. def fill_comsumer(self):
  119. def comsumer_handle(_row,result_queue):
  120. if _row.get(preproject_uuid) is None or _row.get("status") is None:
  121. _preproject = Preproject(_row)
  122. # 删除无uuid数据
  123. _preproject.delete_row(self.ots_client)
  124. return
  125. if _row.get(preproject_has_bidfile) is None:
  126. json_docids = _row.get(preproject_json_docids)
  127. if json_docids is not None:
  128. docids = json.loads(json_docids)
  129. list_docids = []
  130. for a in docids[:30]:
  131. for b in a.split(","):
  132. list_docids.append(b)
  133. atta_query = BoolQuery(should_queries=[TermQuery("docids",_d) for _d in list_docids[:100]])
  134. atta_b_q = BoolQuery(must_queries=[TermQuery("classification","招标文件"),atta_query])
  135. atta_rows,atta_next_token,atta_total_count,_ = self.ots_client.search("attachment","attachment_index",
  136. SearchQuery(atta_b_q,get_total_count=True,limit=1),
  137. ColumnsToGet(["docids"],ColumnReturnType.SPECIFIED))
  138. if atta_total_count>0:
  139. _row[preproject_has_bidfile] = 1
  140. else:
  141. _row[preproject_has_bidfile] = 0
  142. _preproject = Preproject(_row)
  143. _preproject.update_row(self.ots_client)
  144. _mul = MultiThreadHandler(self.task_queue,comsumer_handle,None,10)
  145. _mul.run()
  146. def fill_producer(self):
  147. # 存在uuid数据,补充'has_bidfile'字段
  148. q1 = BoolQuery(must_queries=[
  149. ExistsQuery("uuid"),
  150. BoolQuery(must_not_queries=[
  151. ExistsQuery("has_bidfile")
  152. ])
  153. ])
  154. # 无uuid数据,用于删除行数据
  155. # q2 = BoolQuery(must_not_queries=[
  156. # ExistsQuery("uuid")
  157. # ])
  158. q2 = BoolQuery(should_queries=
  159. [ BoolQuery(must_not_queries=[ExistsQuery("uuid")]),
  160. BoolQuery(must_not_queries=[ExistsQuery("status")])
  161. ])
  162. columns = ["uuid","has_bidfile","json_docids","status"]
  163. query = BoolQuery(should_queries=[q1,
  164. q2])
  165. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  166. SearchQuery(query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
  167. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  168. if len(rows) > 0:
  169. dict_rows = getRow_ots(rows)
  170. for _row in dict_rows:
  171. self.task_queue.put(_row)
  172. while next_token:
  173. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  174. SearchQuery(query,next_token=next_token,get_total_count=True,limit=100),
  175. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  176. if len(rows)>0:
  177. dict_rows = getRow_ots(rows)
  178. for _row in dict_rows:
  179. self.task_queue.put(_row)
  180. def fill_contact_producer(self):
  181. q1 = BoolQuery(must_queries=[
  182. TermQuery("status",1),
  183. ExistsQuery("uuid")
  184. ])
  185. columns = ["status",preproject_tenderee,preproject_last_tenderee_contact,preproject_last_tenderee_phone,preproject_last_win_tenderer,preproject_last_win_tenderer_contact,preproject_last_win_tenderer_phone]
  186. query = q1
  187. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  188. SearchQuery(query,sort=Sort(sorters=[FieldSort("crtime",SortOrder.DESC)]),get_total_count=True,limit=100),
  189. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  190. dict_rows = getRow_ots(rows)
  191. for _row in dict_rows:
  192. self.fill_concat_queue.put(_row)
  193. while next_token:
  194. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  195. SearchQuery(query,next_token=next_token,get_total_count=True,limit=100),
  196. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  197. dict_rows = getRow_ots(rows)
  198. for _row in dict_rows:
  199. self.fill_concat_queue.put(_row)
  200. def fill_contact_comsumer(self):
  201. def comsumer_handle(_row,result_queue):
  202. product = _row.get(preproject_product)
  203. tenderee = _row.get(preproject_tenderee)
  204. tenderee_concat = _row.get(preproject_last_tenderee_contact)
  205. tenderee_phone = _row.get(preproject_last_tenderee_phone)
  206. win_tenderer = _row.get(preproject_last_win_tenderer)
  207. win_tenderer_concat = _row.get(preproject_last_win_tenderer_contact)
  208. win_tenderer_phone = _row.get(preproject_last_win_tenderer_phone)
  209. if tenderee is not None and tenderee!="":
  210. # if (tenderee_concat is None or tenderee_concat=="") and (tenderee_phone is None or tenderee_phone==""):
  211. if tenderee_phone is None or tenderee_phone=="":
  212. #fill tenderee concat and phone
  213. bool_query = BoolQuery(must_queries=[
  214. TermQuery(preproject_tenderee,tenderee),
  215. BoolQuery(should_queries=[
  216. MatchPhraseQuery("doctextcon",product),
  217. MatchPhraseQuery("attachmenttextcon",product)
  218. ])
  219. ])
  220. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  221. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
  222. ColumnsToGet(["tenderee_contact","tenderee_phone"],return_type=ColumnReturnType.SPECIFIED))
  223. list_data = getRow_ots(rows)
  224. _find = False
  225. for _data in list_data:
  226. tenderee_contact = _data.get("tenderee_contact")
  227. tenderee_phone = _data.get("tenderee_phone")
  228. # if (tenderee_contact is not None and tenderee_contact!="") or (tenderee_phone is not None and tenderee_phone!=""):
  229. if tenderee_phone is not None and tenderee_phone!="":
  230. _find = True
  231. _row[preproject_last_tenderee_contact] = tenderee_contact
  232. _row[preproject_last_tenderee_phone] = tenderee_phone
  233. break
  234. if not _find:
  235. bool_query = BoolQuery(must_queries=[
  236. TermQuery("status", 1),
  237. TermQuery("enterprise_name",tenderee)
  238. ])
  239. rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
  240. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=5),
  241. ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
  242. list_data = getRow_ots(rows)
  243. if len(list_data)>0:
  244. # _data = list_data[0]
  245. for _data in list_data:
  246. tenderee_contact = _data.get("contact_person")
  247. tenderee_phone = _data.get("phone_no")
  248. if tenderee_phone is not None and tenderee_phone!="":
  249. _row[preproject_last_tenderee_contact] = tenderee_contact
  250. _row[preproject_last_tenderee_phone] = tenderee_phone
  251. break
  252. if win_tenderer is not None and win_tenderer!="":
  253. # if (win_tenderer_concat is None or win_tenderer_concat!="") and (win_tenderer_phone is None or win_tenderer_phone==""):
  254. if win_tenderer_phone is None or win_tenderer_phone=="":
  255. # fill win_tenderer concat and phone
  256. bool_query = BoolQuery(must_queries=[
  257. TermQuery("win_tenderer",win_tenderer),
  258. BoolQuery(should_queries=[
  259. MatchPhraseQuery("doctextcon",product),
  260. MatchPhraseQuery("attachmenttextcon",product)
  261. ])
  262. ])
  263. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  264. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
  265. ColumnsToGet(["win_tenderer_manager","win_tenderer_phone"],return_type=ColumnReturnType.SPECIFIED))
  266. list_data = getRow_ots(rows)
  267. _find = False
  268. for _data in list_data:
  269. _contact = _data.get("win_tenderer_manager")
  270. _phone = _data.get("win_tenderer_phone")
  271. # if (_contact is not None and _contact!="") or (_phone is not None and _phone!=""):
  272. if _phone is not None and _phone!="" and _phone!=_row.get(preproject_last_tenderee_phone):
  273. _find = True
  274. _row[preproject_last_win_tenderer_contact] = _contact
  275. _row[preproject_last_win_tenderer_phone] = _phone
  276. break
  277. if not _find:
  278. bool_query = BoolQuery(must_queries=[
  279. TermQuery("status", 1),
  280. TermQuery("enterprise_name",win_tenderer)
  281. ])
  282. rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
  283. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=5),
  284. ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
  285. list_data = getRow_ots(rows)
  286. if len(list_data)>0:
  287. # _data = list_data[0]
  288. for _data in list_data:
  289. _contact = _data.get("contact_person")
  290. _phone = _data.get("phone_no")
  291. # _phone不为空,且不等于招标人电话
  292. if _phone is not None and _phone!="" and _phone!=_row.get(preproject_last_tenderee_phone):
  293. _row[preproject_last_win_tenderer_contact] = _contact
  294. _row[preproject_last_win_tenderer_phone] = _phone
  295. break
  296. _row["status"] = 0
  297. _preproject = Preproject(_row)
  298. _preproject.setValue("status",2,True)
  299. _preproject.update_row(self.ots_client)
  300. _mul = MultiThreadHandler(self.fill_concat_queue,comsumer_handle,None,20)
  301. _mul.run()
  302. # 预测产品分类 project_name,demand,doctitle
  303. def get_product_classification(self, tenderee, product, project_name, last_project_name, demand, info_type):
  304. product = product.replace(tenderee, "")
  305. project_name = project_name if project_name else ""
  306. project_name = project_name.replace(tenderee, "")
  307. last_project_name = last_project_name if last_project_name else ""
  308. last_project_name = last_project_name.replace(tenderee, "")
  309. if not project_name:
  310. project_name = last_project_name
  311. demand = demand if demand else ""
  312. demand = demand.replace(tenderee, "")
  313. info_type = info_type if info_type else ""
  314. class_list = []
  315. if not class_list:
  316. for key, value in service_key_word.items():
  317. if re.search(value, product):
  318. class_list.append(key)
  319. for key, value in product_key_word.items():
  320. if re.search(value, product):
  321. class_list.append(key)
  322. for key, value in self.product_classifier_dict.items():
  323. if re.search(value, product):
  324. class_list.append(key)
  325. if "医疗产品" not in class_list and re.search(self.re_medical_product, product):
  326. class_list.append("医疗产品")
  327. # project_name
  328. if project_name and len(class_list) == 0:
  329. for key, value in service_key_word.items():
  330. if re.search(value, project_name):
  331. class_list.append(key)
  332. for key, value in product_key_word.items():
  333. if re.search(value, project_name):
  334. class_list.append(key)
  335. for key, value in self.product_classifier_dict.items():
  336. if re.search(value, product):
  337. class_list.append(key)
  338. if "医疗产品" not in class_list and re.search(self.re_medical_product, project_name):
  339. class_list.append("医疗产品")
  340. # info_type
  341. if info_type and len(class_list) == 0:
  342. for key, value in service_key_word.items():
  343. if re.search(value, info_type):
  344. class_list.append(key)
  345. for key, value in product_key_word.items():
  346. if re.search(value, info_type):
  347. class_list.append(key)
  348. for key, value in self.product_classifier_dict.items():
  349. if re.search(value, product):
  350. class_list.append(key)
  351. if "医疗产品" not in class_list and re.search(self.re_medical_product, info_type):
  352. class_list.append("医疗产品")
  353. # demand
  354. if demand and len(class_list) == 0:
  355. for key, value in service_key_word.items():
  356. if re.search(value, demand):
  357. class_list.append(key)
  358. for key, value in product_key_word.items():
  359. if re.search(value, demand):
  360. class_list.append(key)
  361. for key, value in self.product_classifier_dict.items():
  362. if re.search(value, product):
  363. class_list.append(key)
  364. if "医疗产品" not in class_list and re.search(self.re_medical_product, demand):
  365. class_list.append("医疗产品")
  366. class_list = list(set(class_list))
  367. class_list.sort(key=lambda x: x)
  368. class_list = class_list[:3]
  369. class_list = ",".join(class_list)
  370. return class_list
  371. # 项目标签
  372. def get_project_label(self, product, demand, tenderee, agency):
  373. product = product if product else ""
  374. demand = demand if demand else ""
  375. tenderee = re.sub("\s", "", tenderee) if tenderee else ""
  376. agency = re.sub("\s", "", agency) if agency else ""
  377. # main_text = ",".join(list(set([product,last_doctitle,demand,project_name,last_project_name])))
  378. main_text = ",".join(list(set([product, demand])))
  379. main_text = re.sub("\s", "", main_text)
  380. doctitle = product
  381. doctitle = re.sub("\s", "", doctitle)
  382. # 查询字段排除tenderee、agency
  383. if tenderee:
  384. doctitle = doctitle.replace(tenderee, "")
  385. main_text = main_text.replace(tenderee, "")
  386. if agency:
  387. doctitle = doctitle.replace(agency, "")
  388. main_text = main_text.replace(agency, "")
  389. doctitle_dict = dict()
  390. main_text_dict = dict()
  391. for item in self.project_label_keywords:
  392. _type = item[0]
  393. key_wrod = item[1]
  394. # 关键词排除词
  395. key_paichuci = item[2]
  396. key_paichuci_s = "|".join(key_paichuci.split('、'))
  397. # 类型排除词
  398. type_paichuci = item[3]
  399. if type_paichuci:
  400. paichuci_split = type_paichuci.split('、')
  401. if re.search("|".join(paichuci_split), main_text):
  402. continue
  403. if doctitle:
  404. if key_wrod in doctitle:
  405. if not key_paichuci_s or (key_paichuci_s and not re.search(key_paichuci_s, doctitle)):
  406. key_wrod_count1 = doctitle.count(key_wrod)
  407. if _type not in doctitle_dict:
  408. doctitle_dict[_type] = {'关键词': [], '排除词': type_paichuci}
  409. doctitle_dict[_type]['关键词'].append((key_wrod, key_wrod_count1))
  410. if main_text:
  411. if key_wrod in main_text:
  412. if not key_paichuci_s or (key_paichuci_s and not re.search(key_paichuci_s, main_text)):
  413. key_wrod_count2 = main_text.count(key_wrod)
  414. if _type not in main_text_dict:
  415. main_text_dict[_type] = {'关键词': [], '排除词': type_paichuci}
  416. main_text_dict[_type]['关键词'].append((key_wrod, key_wrod_count2))
  417. # 排序 doctitle
  418. for k, v in doctitle_dict.items():
  419. doctitle_dict[k]['关键词'].sort(key=lambda x: x[1], reverse=True)
  420. # 按匹配次数保留前10个标签
  421. if len(doctitle_dict) > 10:
  422. doctitle_labels = [(k, sum(w[1] for w in doctitle_dict[k]['关键词'])) for k in doctitle_dict]
  423. doctitle_labels.sort(key=lambda x: x[1], reverse=True)
  424. for item in doctitle_labels[10:]:
  425. doctitle_dict.pop(item[0])
  426. # main_text
  427. pop_list = []
  428. for k, v in main_text_dict.items():
  429. if sum([j[1] for j in main_text_dict[k]['关键词']]) == 1:
  430. pop_list.append(k)
  431. main_text_dict[k]['关键词'].sort(key=lambda x: x[1], reverse=True)
  432. # if len(pop_list)<len(main_text_dict):
  433. # for k in pop_list:
  434. # main_text_dict.pop(k)
  435. # 按匹配次数保留前10个标签
  436. if len(main_text_dict) > 10:
  437. main_text_labels = [(k, sum(w[1] for w in main_text_dict[k]['关键词'])) for k in main_text_dict]
  438. main_text_labels.sort(key=lambda x: x[1], reverse=True)
  439. for item in main_text_labels[10:]:
  440. main_text_dict.pop(item[0])
  441. doctitle_dict = [i for i in doctitle_dict.keys()]
  442. doctitle_dict.sort(key=lambda x: x)
  443. doctitle_dict = ','.join(doctitle_dict) if doctitle_dict else None
  444. main_text_dict = [i for i in main_text_dict.keys()]
  445. main_text_dict.sort(key=lambda x: x)
  446. main_text_dict = ','.join(main_text_dict) if main_text_dict else None
  447. return doctitle_dict, main_text_dict
  448. def purchaseIntention_process_comsumer(self):
  449. def comsumer_handle(_row,result_queue):
  450. docid = _row.get('docid')
  451. tenderee = _row.get('tenderee')
  452. demand_info = _row.get('demand_info')
  453. project_name = _row.get('project_name')
  454. if demand_info is not None and 'data":[]' not in demand_info:
  455. result = process_purchaseIntention(docid, tenderee, demand_info, project_name)
  456. for item in result:
  457. _tenderee, final_product, product_list, order_begin, order_end, demand, _project_name, budget, json_docids = item
  458. if order_begin and _tenderee is not None and len(_tenderee)>3 and final_product is not None and len(final_product)>1:
  459. province = _row.get('province')
  460. city = _row.get('city')
  461. district = _row.get('district')
  462. doctitle = _row.get('doctitle')
  463. tenderee_contact = _row.get('tenderee_contact')
  464. tenderee_phone = _row.get('tenderee_phone')
  465. info_type = _row.get('info_type')
  466. result_row = dict()
  467. result_row['tenderee'] = _tenderee
  468. result_row['product'] = final_product
  469. result_row['may_begin'] = order_begin
  470. result_row['may_end'] = order_end
  471. result_row['crtime'] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
  472. result_row['type'] = 1
  473. result_row['demand'] = demand
  474. result_row['project_name'] = _project_name
  475. result_row['bidding_budget'] = budget
  476. result_row['prob'] = 1.0
  477. result_row['json_docids'] = json_docids
  478. result_row['province'] = province
  479. result_row['city'] = city
  480. result_row['district'] = district
  481. result_row['in_doctextcon'] = 0
  482. result_row['last_doctitle'] = doctitle
  483. result_row['last_tenderee_contact'] = tenderee_contact
  484. result_row['last_tenderee_phone'] = tenderee_phone
  485. result_row['status'] = 1
  486. result_row['uuid'] = str(uuid4())
  487. result_row['product_classification'] = self.get_product_classification(_tenderee, final_product, _project_name, "", demand, info_type)
  488. doctitle_product_labels,core_field_product_labels = self.get_project_label(final_product, demand, _tenderee, "")
  489. result_row['doctitle_product_labels'] = doctitle_product_labels
  490. result_row['core_field_product_labels'] = core_field_product_labels
  491. # tenderee, product, may_begin, may_end, crtime, type, demand, project_name, bidding_budget, prob, json_docids, province, city, district,
  492. # in_doctextcon, last_doctitle, last_tenderee_contact, last_tenderee_phone
  493. _preproject = Preproject(result_row)
  494. if not _preproject.exists_row(self.ots_client):
  495. _preproject.update_row(self.ots_client)
  496. _mul = MultiThreadHandler(self.purchaseIntention_process_queue,comsumer_handle,None,20)
  497. _mul.run()
  498. def purchaseIntention_process_producer(self):
  499. columns = ['tenderee','demand_info','project_name','province', 'city', 'district','doctitle', 'tenderee_contact',
  500. 'tenderee_phone', 'info_type']
  501. end_page_time = time.strftime("%Y-%m-%d",time.localtime())
  502. start_page_time = timeAdd(end_page_time,days=-5)
  503. start_opertime = end_page_time + " 00:00:00" # start_opertime默认值为当天0点
  504. end_opertime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
  505. log_path = log_purchaseIntentionProcess_opertime_path
  506. all_log_msg = []
  507. with open(log_path, mode='r', encoding='utf-8') as f:
  508. all_log_msg = f.readlines()
  509. if all_log_msg:
  510. if all_log_msg[-1]=='\n':
  511. all_log_msg = all_log_msg[:-1]
  512. last_opertime = ""
  513. for _log in all_log_msg[::-1]:
  514. if "end opertime time:" in _log:
  515. try:
  516. last_opertime = re.search('(?:end opertime time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
  517. break
  518. except:
  519. continue
  520. if last_opertime:
  521. start_opertime = last_opertime
  522. log_msg = "PurchaseIntention_process start, start opertime:%s, end opertime time:%s " % (start_opertime,end_opertime)
  523. all_log_msg = all_log_msg[-2000:]
  524. all_log_msg.append(log_msg + '\n')
  525. if all_log_msg:
  526. with open(log_path, mode='w', encoding='utf-8') as f:
  527. f.writelines(all_log_msg)
  528. query = BoolQuery(must_queries=[
  529. TermQuery('save',1), # 去重数据
  530. TermQuery('docchannel',114), # 采购意向类型公告
  531. RangeQuery('page_time',range_from=start_page_time,range_to=end_page_time,include_upper=True),
  532. RangeQuery('opertime',range_from=start_opertime,range_to=end_opertime)
  533. ])
  534. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  535. SearchQuery(query,sort=Sort(sorters=[FieldSort("page_time")]),get_total_count=True,limit=100),
  536. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  537. if len(rows) > 0:
  538. dict_rows = getRow_ots(rows)
  539. for _row in dict_rows:
  540. self.purchaseIntention_process_queue.put(_row)
  541. log("purchaseIntention_process ots query total_count:%d"%total_count)
  542. while next_token:
  543. rows, next_token, total_count, is_all_succeed = self.ots_client.search("document_tmp", "document_tmp_index",
  544. SearchQuery(query,next_token=next_token,
  545. get_total_count=True,limit=100),
  546. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  547. if len(rows) > 0:
  548. dict_rows = getRow_ots(rows)
  549. for _row in dict_rows:
  550. self.purchaseIntention_process_queue.put(_row)
  551. def schedule(self):
  552. _scheduler = BlockingScheduler()
  553. _scheduler.add_job(self.fill_producer,"cron",minute="*/10")
  554. _scheduler.add_job(self.fill_comsumer,"cron",minute="*/10")
  555. _scheduler.add_job(self.fill_contact_producer,"cron",minute="*/1")
  556. _scheduler.add_job(self.fill_contact_comsumer,"cron",minute="*/1")
  557. _scheduler.add_job(self.purchaseIntention_process_producer, "cron", minute="*/30")
  558. _scheduler.add_job(self.purchaseIntention_process_comsumer, "cron", minute="*/30")
  559. _scheduler.start()
  560. def start_fill_preproject():
  561. preprojectFill = PreprojectFill()
  562. preprojectFill.schedule()
  563. # 采购意向数据处理
  564. def process_purchaseIntention(docid, tenderee, json_demand_info, doc_project_name):
  565. def getProduct(tenderee, _product, project_name, doc_project_name):
  566. if len(_product) > 0:
  567. _product.sort(key=lambda x: len(x), reverse=True)
  568. return _product[0]
  569. else:
  570. product = ""
  571. pj_name = str(project_name).replace(tenderee, "")
  572. pj_name = re.sub(".*公司|项目|采购", "", pj_name)
  573. if len(pj_name) < 25:
  574. product = pj_name
  575. if not product:
  576. if doc_project_name:
  577. doc_project_name = str(doc_project_name).replace(tenderee, "")
  578. doc_project_name = re.sub(".*公司|项目|采购", "", doc_project_name)
  579. product = doc_project_name
  580. return product
  581. # 时间格式范围准确性
  582. def formatTime(_date):
  583. if _date is not None:
  584. _d = _date.split("-")
  585. if len(_d) == 3:
  586. if int(_d[0]) > 2010 and int(_d[0]) < 2050 and int(_d[1]) > 0 and int(_d[1]) <= 12 and int(
  587. _d[2]) > 0 and int(_d[2]) <= 31:
  588. return "%s-%s-%s" % (_d[0].rjust(4, "2"), _d[1].rjust(2, "0"), _d[2].rjust(2, "0"))
  589. # 时间范围规范性
  590. def get_begin_end_time(order_begin, order_end):
  591. try:
  592. order_begin = time.mktime(time.strptime(order_begin, "%Y-%m-%d"))
  593. order_end = time.mktime(time.strptime(order_end, "%Y-%m-%d"))
  594. mid_time = (order_end - order_begin) / (24 * 60 * 60)
  595. if mid_time >= 0 and mid_time < 125:
  596. return True
  597. else:
  598. return False
  599. except:
  600. return False
  601. def process(docid, tenderee, json_demand_info, doc_project_name):
  602. if json_demand_info is None:
  603. return
  604. try:
  605. demand_info = json.loads(json_demand_info)
  606. except Exception as e:
  607. return
  608. result_list = []
  609. for _line in demand_info["data"]:
  610. try:
  611. _product = _line.get("product", [])
  612. _product.sort(key=lambda x: x)
  613. product_list = json.dumps(_product, ensure_ascii=False)
  614. order_end = _line.get("order_end")
  615. order_end = formatTime(order_end)
  616. project_name = _line.get("project_name", "")
  617. project_name = re.sub("\s", "", project_name)
  618. demand = _line.get("demand")
  619. budget = _line.get("budget")
  620. _tenderee = _line.get("tenderee", "")
  621. _tenderee = re.sub("\s", "", _tenderee)
  622. if _tenderee:
  623. # 优先选择表格中对应的采购单位而不是全文提取到的
  624. tenderee = _tenderee
  625. if budget:
  626. budget = float(budget)
  627. else:
  628. budget = 0.0
  629. order_begin = _line.get("order_begin")
  630. order_begin = formatTime(order_begin)
  631. if order_begin and order_end:
  632. if not get_begin_end_time(order_begin, order_end):
  633. continue
  634. if not order_begin or not order_end:
  635. continue
  636. product = getProduct(tenderee, _product, project_name, doc_project_name)
  637. product = re.sub("\s", "", product)
  638. json_docids = json.dumps([str(docid)], ensure_ascii=False)
  639. final_product = ""
  640. if project_name:
  641. # 优先使用 project_name作为产品
  642. final_product = project_name
  643. else:
  644. final_product = product
  645. if final_product:
  646. result_list.append([tenderee, final_product, product_list, order_begin, order_end, demand, project_name,budget, json_docids])
  647. except Exception as e:
  648. # logging.info("============error:%s" % (str(e)))
  649. pass
  650. return result_list
  651. result = process(docid, tenderee, json_demand_info, doc_project_name)
  652. if not result:
  653. return []
  654. else:
  655. return result
  656. def delete_wrong_data():
  657. import pandas as pd
  658. list_data = []
  659. task_queue = Queue()
  660. ots_client = getConnect_ots()
  661. q1 = BoolQuery(must_queries=[
  662. WildcardQuery("may_begin","000*"),
  663. # RangeQuery("crtime",'2022-05-24')
  664. ])
  665. rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
  666. SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
  667. ColumnsToGet(return_type=ColumnReturnType.ALL))
  668. dict_rows = getRow_ots(rows)
  669. list_data.extend(dict_rows)
  670. _count = len(dict_rows)
  671. print("total_count",total_count)
  672. while next_token:
  673. rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
  674. SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
  675. ColumnsToGet(return_type=ColumnReturnType.ALL))
  676. dict_rows = getRow_ots(rows)
  677. list_data.extend(dict_rows)
  678. _count += len(dict_rows)
  679. print("%d/%d"%(_count,total_count))
  680. # if _count>10000:
  681. # break
  682. # df = pd.read_csv("fa45de36-6e47-4817-b2d6-e79ea8c154601.csv")
  683. # for tenderee,product,may_begin,may_end in zip(df["tenderee"],df["product"],df["may_begin"],df["may_end"]):
  684. # _data = {"tenderee":tenderee,
  685. # "product":product,
  686. # "may_begin":str(may_begin),
  687. # "may_end":str(may_end)}
  688. # list_data.append(_data)
  689. for _data in list_data:
  690. # may_begin = _data.get("may_begin")
  691. # may_end = _data.get("may_end")
  692. # if len(may_begin)!=10 or len(may_end)!=10:
  693. # task_queue.put(_data)
  694. task_queue.put(_data)
  695. def _handle(item,result_queue):
  696. _preproject = Preproject(item)
  697. _preproject.delete_row(ots_client)
  698. print(item)
  699. item["may_begin"] = item["may_begin"].replace("0001","2022")
  700. item["may_end"] = item["may_end"].replace("0001","2022")
  701. _preproject = Preproject(item)
  702. _preproject.update_row(ots_client)
  703. print(item)
  704. log("====%d"%task_queue.qsize())
  705. mt = MultiThreadHandler(task_queue,_handle,None,30)
  706. mt.run()
  707. def fixProductName():
  708. from BaseDataMaintenance.dataSource.source import getConnect_ots
  709. ots_client = getConnect_ots()
  710. bool_query = BoolQuery(must_queries=[MatchPhraseQuery("product","酒水降尘")])
  711. rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
  712. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
  713. ColumnsToGet(return_type=ColumnReturnType.ALL))
  714. dict_rows = getRow_ots(rows)
  715. for _row in dict_rows:
  716. preproject1 = Preproject(_row)
  717. preproject1.delete_row(ots_client)
  718. _row["product"] = _row["product"].replace("酒水降尘","洒水降尘")
  719. preproject2 = Preproject(_row)
  720. preproject2.update_row(ots_client)
  721. print(dict_rows)
  722. if __name__=="__main__":
  723. # preprojectFill = PreprojectFill()
  724. # preprojectFill.schedule()
  725. # delete_wrong_data()
  726. # fixProductName()
  727. start_fill_preproject()
  728. pass