|
@@ -25,6 +25,8 @@ from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_te
|
|
|
from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
|
|
|
from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
|
|
|
|
|
|
+from BaseDataMaintenance.common.Utils import getUnifyMoney
|
|
|
+
|
|
|
class ActiveMQListener():
|
|
|
|
|
|
def __init__(self,conn,_queue,*args,**kwargs):
|
|
@@ -835,16 +837,45 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
|
|
|
|
|
|
self.list_extract_comsumer = []
|
|
|
+ self.list_extract_ai_comsumer = []
|
|
|
|
|
|
# for _i in range(self.comsumer_count):
|
|
|
# listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
|
|
|
# createComsumer(listener_extract,self.mq_extract)
|
|
|
# self.list_extract_comsumer.append(listener_extract)
|
|
|
|
|
|
+ # 提取listener
|
|
|
if create_listener:
|
|
|
for ii in range(10):
|
|
|
listener_p = Process(target=self.start_extract_listener)
|
|
|
listener_p.start()
|
|
|
+ # listener_p_ai = Thread(target=self.start_extract_AI_listener)
|
|
|
+ # listener_p_ai.start()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def start_extract_AI_listener(self,_count=1):
|
|
|
+
|
|
|
+ self.list_extract_ai_comsumer = []
|
|
|
+
|
|
|
+ for _i in range(_count):
|
|
|
+ listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
|
|
|
+ createComsumer(listener_extract,self.mq_extract_ai)
|
|
|
+ self.list_extract_ai_comsumer.append(listener_extract)
|
|
|
+
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ for _i in range(len(self.list_extract_ai_comsumer)):
|
|
|
+ if self.list_extract_ai_comsumer[_i].conn.is_connected():
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
|
|
|
+ createComsumer(listener,self.mq_extract_ai)
|
|
|
+ self.list_extract_ai_comsumer[_i] = listener
|
|
|
+ time.sleep(5)
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
|
|
|
def start_extract_listener(self):
|
|
|
|
|
@@ -1256,7 +1287,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
has_win_tenderer = True
|
|
|
|
|
|
if has_entity:
|
|
|
- if not has_tenderee:
|
|
|
+ if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","开标记录","验收合同"}:
|
|
|
return True
|
|
|
if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","开标记录","验收合同"}:
|
|
|
return True
|
|
@@ -1273,8 +1304,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
dhtml = Document_html({"partitionkey":item.get("partitionkey"),
|
|
|
"docid":item.get("docid")})
|
|
|
|
|
|
+ _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
|
|
|
+
|
|
|
+ dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
|
|
|
+
|
|
|
extract_times = item.get("extract_times",0)+1
|
|
|
item["extract_times"] = extract_times
|
|
|
+ _extract_json = item.get(document_extract2_extract_json)
|
|
|
|
|
|
_extract_ai = {}
|
|
|
try:
|
|
@@ -1289,12 +1325,122 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
_json = get_json_from_text(result)
|
|
|
|
|
|
if _json is not None:
|
|
|
- _extract_ai = json.loads(_json)
|
|
|
+ try:
|
|
|
+ _extract_ai = json.loads(_json)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+
|
|
|
+ if len(_extract_ai.keys())>0:
|
|
|
+ _new_json,_changed = self.merge_json(_extract_json,_json)
|
|
|
+ if _changed:
|
|
|
+ dtmp.setValue("extract_json_AI",json.dumps(_extract_ai,ensure_ascii=False))
|
|
|
+ dtmp.setValue(document_tmp_dochtmlcon,"",False)
|
|
|
+ dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
|
|
|
+ dtmp.update_row(self.ots_client)
|
|
|
+ if not dhtml.exists_row(self.ots_client):
|
|
|
+ dhtml.update_row(self.ots_client)
|
|
|
+
|
|
|
+ _extract = Document_extract({})
|
|
|
+ _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
|
|
|
+ _extract.setValue(document_extract2_docid,item.get(document_docid))
|
|
|
+ _extract.setValue(document_extract2_extract_json,_new_json,True)
|
|
|
+ _extract.setValue(document_extract2_industry_json,"{}",True)
|
|
|
+ _extract.setValue(document_extract2_status,random.randint(1,50),True)
|
|
|
+ _extract.update_row(self.ots_client)
|
|
|
+
|
|
|
+ log("extract_ai of docid:%d"%(item.get(document_docid)))
|
|
|
+
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
|
+
|
|
|
except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
|
pass
|
|
|
|
|
|
|
|
|
+ def merge_json(self,extract_json,extract_ai_json):
|
|
|
+ _extract = {}
|
|
|
+ if extract_json is not None:
|
|
|
+ try:
|
|
|
+ _extract = json.loads(extract_json)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+ _extract_ai = {}
|
|
|
+ if extract_ai_json is not None:
|
|
|
+ try:
|
|
|
+ _extract_ai = json.loads(extract_ai_json)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+ prem = _extract.get("prem")
|
|
|
+ if prem is None:
|
|
|
+ _extract["prem"] = {}
|
|
|
+ prem = _extract["prem"]
|
|
|
+ Project = prem.get("Project")
|
|
|
+ if Project is None:
|
|
|
+ prem["Project"] = {}
|
|
|
+ Project = prem["Project"]
|
|
|
+ Project_rolelist = Project.get("roleList")
|
|
|
+ if Project_rolelist is None:
|
|
|
+ Project["roleList"] = []
|
|
|
+ Project_rolelist = Project["roleList"]
|
|
|
+
|
|
|
+ has_tenderee = False
|
|
|
|
|
|
+ has_win_tenderee = False
|
|
|
+ for _pack,_pack_value in prem.items():
|
|
|
+ _rolelist = _pack_value.get("roleList",[])
|
|
|
+ for _role in _rolelist:
|
|
|
+ if _role.get("role_name","")=="tenderee":
|
|
|
+ has_tenderee = True
|
|
|
+ if _role.get("role_name","")=="win_tenderer":
|
|
|
+ has_win_tenderee = True
|
|
|
+
|
|
|
+
|
|
|
+ _changed = False
|
|
|
+ if not has_tenderee:
|
|
|
+ _tenderee_ai = _extract_ai.get("招标人")
|
|
|
+ if _tenderee_ai is not None and _tenderee_ai!="":
|
|
|
+ _role_dict = {
|
|
|
+ "role_name": "tenderee",
|
|
|
+ "role_text": _tenderee_ai,
|
|
|
+ }
|
|
|
+ Project_rolelist.append(_role_dict)
|
|
|
+ _changed = True
|
|
|
+ if not has_win_tenderee:
|
|
|
+ list_win = _extract_ai.get("中标信息",[])
|
|
|
+ if len(list_win)>0:
|
|
|
+ for _win_dict_i in range(len(list_win)):
|
|
|
+ _win_dict = list_win[_win_dict_i]
|
|
|
+ _pack = _win_dict.get("标段号","")
|
|
|
+ if _pack=="":
|
|
|
+ if len(list_win)>1:
|
|
|
+ _pack = "AI_%d"%(_win_dict_i)
|
|
|
+ else:
|
|
|
+ _pack = "Project"
|
|
|
+ _win_money = _win_dict.get("中标金额")
|
|
|
+ if _win_money!="":
|
|
|
+ _win_money = getUnifyMoney(_win_money)
|
|
|
+ else:
|
|
|
+ _win_money = 0
|
|
|
+ _win_tenderer = _win_dict.get("中标人名称")
|
|
|
+ if _win_tenderer!="":
|
|
|
+ _role_dict = {
|
|
|
+ "role_name": "win_tenderer",
|
|
|
+ "role_text": _win_tenderer,
|
|
|
+ "role_money":{
|
|
|
+ "role_money": str(_win_money)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ _changed = True
|
|
|
+ if _pack=="Project":
|
|
|
+ Project_rolelist.append(_role_dict)
|
|
|
+ else:
|
|
|
+ prem[_pack] = {
|
|
|
+ "roleList":[
|
|
|
+ _role_dict
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ return json.dumps(_extract,ensure_ascii=False),_changed
|
|
|
|
|
|
|
|
|
|
|
@@ -2039,12 +2185,8 @@ if __name__ == '__main__':
|
|
|
# di.start_dataflow_init()
|
|
|
# transform_attachment()
|
|
|
# del_test_doc()
|
|
|
- # de = Dataflow_ActivteMQ_extract()
|
|
|
+ de = Dataflow_ActivteMQ_extract(create_listener=True)
|
|
|
# de.start_flow_extract()
|
|
|
# fixDoc_to_queue_extract()
|
|
|
# check_data_synchronization()
|
|
|
- # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
|
|
|
- current_date = getCurrent_date(format="%Y-%m-%d")
|
|
|
- last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
|
|
|
- sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
|
|
|
- print(sql)
|
|
|
+ # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
|