123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- #coding:UTF8
- from utils.Utils import *
- from dataSource.source import getConnect_ots
- from utils.multiThread import MultiThreadHandler
- from queue import Queue
- import json
- from utils.hashUtil import aesCipher
- import inspect
- from tablestore import *
- set_columns = set()
- list_df_columns = []
- from Crypto.Cipher import PKCS1_v1_5 as Cipher_pksc1_v1_5
- from Crypto.PublicKey import RSA
- def rsa_encrpt(string, public_key):
- rsakey = RSA.importKey(public_key) # 读取公钥
- cipher = Cipher_pksc1_v1_5.new(rsakey)
- # 因为encryptor.encrypt方法其内部就实现了加密再次Base64加密的过程,所以这里实际是通过下面的1和2完成了JSEncrypt的加密方法
- encrypt_text = cipher.encrypt(string.encode()) # 1.对账号密码组成的字符串加密
- return encrypt_text
- def getOneContact(contacts,tojson=True,mobile_first=True,mobile_only=True,high_level_first=True):
- mobile_person = ""
- mobile_no = ''
- phone_person = ""
- phone_no = ''
- if contacts is None:
- return "",""
- try:
- if isinstance(contacts,str):
- list_contacts = json.loads(contacts)
- else:
- list_contacts = contacts
- list_contacts.sort(key=lambda x:x.get("level",0),reverse=True)
- for _contact in list_contacts:
- _level = _contact.get("level")
- is_mobile = _contact.get("is_mobile",0)
- if is_mobile==1:
- _contact["mobile_no"] = _contact.get("phone_no")
- if _contact.get("mobile_no","")!="":
- mobile_person = _contact.get("contact_person","")
- mobile_no = _contact.get("mobile_no","")
- if _level==40:
- if mobile_person!="":
- mobile_person += "(法人)"
- if _contact.get("phone_no","")!="":
- phone_person = _contact.get("contact_person","")
- phone_no = _contact.get("phone_no","")
- if _level==40:
- if phone_person!="":
- phone_person += "(法人)"
- if mobile_first:
- if mobile_no!="" and mobile_person!="":
- return mobile_person,mobile_no
- else:
- if mobile_only:
- return mobile_person,mobile_no
- except Exception as e:
- pass
- return phone_person,phone_no
- def getMobiles(contacts,to_json=True):
- if to_json:
- list_contacts = json.loads(contacts)
- else:
- list_contacts = contacts
- list_result = []
- for _c in list_contacts:
- if _c.get("mobile_no","")!="":
- list_result.append([_c.get("contact_person",""),_c.get("mobile_no")])
- return list_result
- def set_dict_item(_dict,name,v):
- _dict[name] = getLegal_str(v)
- if name not in set_columns:
- set_columns.add(name)
- list_df_columns.append(getLegal_str(name))
- def set_dict_item_columns(set_columns1,list_df_columns1,_dict,name,v):
- _dict[name] = getLegal_str(v)
- if name not in set_columns1:
- set_columns1.add(name)
- list_df_columns1.append(getLegal_str(name))
- def getRowData_document(df_data,rows,set_line,list_keyword,dict_channel):
- # list_data = getRow_ots(rows)
- for row in rows:
- item = {}
- _dict = row
- set_dict_item(item,"docid",_dict.get("docid",""))
- set_dict_item(item,"公告标题",_dict.get("doctitle",""))
- set_dict_item(item,"公告内容",_dict.get("doctextcon",""))
- set_dict_item(item,"附件内容",_dict.get("attachmenttextcon",""))
- set_dict_item(item,"公告类别",dict_channel.get(_dict.get("docchannel",""),""))
- set_dict_item(item,"关键词",",".join(list(set(re.findall("|".join([re.escape(str(a).replace("(","(").replace(")",")")) for a in list_keyword]),re.sub("\s","",str(row.get("doctitle","")+row.get("doctextcon","")+row.get("attachmenttextcon","")).replace("(","(").replace(")",")")))))))
- set_dict_item(item,"产品",_dict.get("product",""))
- set_dict_item(item,"省份",_dict.get("province",""))
- # item["区域"] = "%s-%s-%s"%(_dict.get("province",""),_dict.get("city",""),_dict.get("district",""))
- set_dict_item(item,"城市",_dict.get("city",""))
- set_dict_item(item,"区县",_dict.get("district",""))
- set_dict_item(item,"发布时间",_dict.get("page_time",""))
- set_dict_item(item,"截标时间",_dict.get("time_bidclose",""))
- set_dict_item(item,"开标时间",_dict.get("time_bidopen",""))
- set_dict_item(item,"创建时间",_dict.get("crtime",""))
- set_dict_item(item,"招标方式",_dict.get("bidway",""))
- set_dict_item(item,"行业一级分类",_dict.get("industry",""))
- set_dict_item(item,"行业二级分类",_dict.get("info_type",""))
- set_dict_item(item,"uuid",_dict.get("uuid"))
- set_dict_item(item,"公告标题_refine",re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', _dict.get("doctitle","")))
- set_dict_item(item,"项目编号",_dict.get("project_code",""))
- set_dict_item(item,"招标单位",_dict.get("tenderee",""))
- set_dict_item(item,"招标联系人",_dict.get("tenderee_contact",""))
- set_dict_item(item,"招标联系人电话",_dict.get("tenderee_phone",""))
- set_dict_item(item,"代理单位",_dict.get("agency",""))
- set_dict_item(item,"代理联系人",_dict.get("agency_contact",""))
- set_dict_item(item,"代理联系人电话",_dict.get("agency_phone",""))
- set_dict_item(item,"比地招标公告地址","http://www.bidizhaobiao.com/excel_detail.do?code=%s"%(str(aesCipher.encrypt('{"docid":%d}'%_dict.get("docid")))))
- set_dict_item(item,"开标时间",_dict.get("time_bidopen",""))
- set_dict_item(item,"截标时间",_dict.get("time_bidclose",""))
- sub_docs_json = _dict.get("sub_docs_json")
- set_tenderer = set()
- if sub_docs_json is not None:
- for _doc in json.loads(sub_docs_json):
- if "win_tenderer" in _doc:
- set_dict_item(item,"中标单位",_doc["win_tenderer"])
- if "second_tenderer" in _doc:
- set_tenderer.add(_doc.get("second_tenderer"))
- if "third_tenderer" in _doc:
- set_tenderer.add(_doc.get("third_tenderer"))
- if "win_tenderee_manager" in _doc:
- set_dict_item(item,"中标单位联系人",_doc["win_tenderee_manager"])
- if "win_tenderee_phone" in _doc:
- set_dict_item(item,"中标单位联系电话",_doc["win_tenderee_phone"])
- if "win_bid_price" in _doc and float(0 if _doc["win_bid_price"]=="" else _doc["win_bid_price"])>0:
- set_dict_item(item,"中标金额",_doc["win_bid_price"])
- if "bidding_budget" in _doc and float(0 if _doc["bidding_budget"]=="" else _doc["bidding_budget"])>0:
- set_dict_item(item,"招标金额",_doc["bidding_budget"])
- set_dict_item(item,"入围供应商",",".join(list(set_tenderer)))
- if "招标金额" not in item:
- set_dict_item(item,"招标金额","")
- if "中标金额" not in item:
- set_dict_item(item,"中标金额","")
- if "中标单位" not in item:
- set_dict_item(item,"中标单位","")
- if "中标单位联系人" not in item:
- set_dict_item(item,"中标单位联系人","")
- if "中标单位联系电话" not in item:
- set_dict_item(item,"中标单位联系电话","")
- # if item["中标单位"] not in set_enter:
- # continue
- _line = "%s-%s-%s-%s-%s-%s"%(item["省份"],item["城市"],item["项目编号"],item["招标单位"],item["中标单位"],str(item["中标金额"]))
- # if re.search("[大中小]学|幼儿园|医院|公司",item["招标单位"]) is not None:
- # continue
- # if _line in set_line:
- # continue
- # if _dict.get("docid","") in set_ig_docid:
- # continue
- # if item["招标金额"]=="":
- # continue
- # set_line.add(_line)
- for k,v in item.items():
- if k not in df_data:
- df_data[k] = []
- df_data[k].append(v)
- def getDictEnterprise(list_enterprise,columns_to_get = ["reg_capital","actual_capital","industry","estiblish_time","social_staff_num","zhong_biao_number","tou_biao_number","credit_code"]):
- task_queue = Queue()
- result_queue= Queue()
- for _enterprise in list_enterprise:
- task_queue.put(str(_enterprise))
- def _handle(item,result_queue,ots_client):
- try:
- primary_key = [("name",item)]
- consumed,return_row,next_token = ots_client.get_row("enterprise",primary_key,columns_to_get,None,1)
- dict_data = getRow_ots_primary(return_row)
- if dict_data is not None:
- bool_q = BoolQuery(must_queries=[TermQuery("enterprise_name",item),
- TermQuery("status",1)])
- rows,next_token,total_count,is_allowed = ots_client.search("enterprise_contact","enterprise_contact_index",
- SearchQuery(bool_q,limit=10),
- columns_to_get=ColumnsToGet(["contact_person","phone_no","is_mobile","level","is_legal_person","is_manager","is_shareholder"],ColumnReturnType.SPECIFIED))
- list_contact = getRow_ots(rows)
- dict_data["contacts"] = list_contact
- result_queue.put({item:dict_data})
- except Exception as e:
- traceback.print_exc()
- ots_client = getConnect_ots()
- mt = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=50,ots_client=ots_client)
- mt.run()
- dict_enterprise = {}
- while True:
- try:
- _dict = result_queue.get(False)
- for k,v in _dict.items():
- dict_enterprise[k] = v
- except Exception as e:
- break
- return dict_enterprise
- def splitIntoList(_str,_splitkeys):
- list_words = []
- for _word in re.split(_splitkeys,_str):
- if _word.strip()=="":
- continue
- list_words.append(_word)
- return list_words
- def fixContactPerson(df_data,list_df_columns,get_legal_person=False):
- set_enterprise = set()
- if len(df_data.keys())>0:
- for _tenderee,_agency,_win_tenderer in zip(df_data["招标单位"],df_data["代理单位"],df_data["中标单位"]):
- set_enterprise.add(_tenderee)
- set_enterprise.add(_agency)
- set_enterprise.add(_win_tenderer)
- if "" in set_enterprise:
- set_enterprise.remove("")
- if None in set_enterprise:
- set_enterprise.remove(None)
- dict_enterprise = getDictEnterprise(list(set_enterprise),columns_to_get = ["procurement_system","company_org_type","reg_capital","actual_capital","contacts","estiblish_time","social_staff_num","zhong_biao_number","tou_biao_number","credit_code","legal_person_name","phone_number"])
- # conn = getConnection_oracle()
- # cursor = conn.cursor()
- if len(set_enterprise)>0:
- for _i in range(len(df_data["招标单位"])):
- _enterprise_name = df_data["招标单位"][_i]
- if df_data["招标联系人电话"][_i]=="":
- contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- if contacts is not None:
- _person,_phone = getOneContact(contacts)
- df_data["招标联系人"][_i] = _person
- df_data["招标联系人电话"][_i] = _phone
- if "信用代码" not in df_data:
- df_data["信用代码"] = []
- df_data["信用代码"].append(dict_enterprise.get(_enterprise_name,{}).get("credit_code",""))
- if "招标人采购系统" not in df_data:
- df_data["招标人采购系统"] = []
- df_data["招标人采购系统"].append(dict_enterprise.get(_enterprise_name,{}).get("procurement_system",""))
- if "招标人类型" not in df_data:
- df_data["招标人类型"] = []
- df_data["招标人类型"].append(dict_enterprise.get(_enterprise_name,{}).get("company_org_type",""))
- # if "原网地址" not in df_data:
- # df_data["原网地址"] = []
- # if df_data["公告类别"][_i]=="招标公告":
- # table_name = "bxkc.T_ZHAO_BIAO_GONG_GAO"
- # else:
- # table_name = "bxkc.T_ZHONG_BIAO_XIN_XI"
- # sql = "select detail_link from %s where id='%s' "%(table_name,df_data["uuid"][_i])
- # cursor.execute(sql)
- # rows = cursor.fetchall()
- # if len(rows)>0:
- # df_data["原网地址"].append(rows[0][0])
- # else:
- # df_data["原网地址"].append("")
- _enterprise_name = df_data["代理单位"][_i]
- if df_data["代理联系人电话"][_i]=="":
- contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- if contacts is not None:
- _person,_phone = getOneContact(contacts)
- df_data["代理联系人"][_i] = _person
- df_data["代理联系人电话"][_i] = _phone
- _enterprise_name = df_data["中标单位"][_i]
- if get_legal_person:
- _person = dict_enterprise.get(_enterprise_name,{}).get("legal_person_name","")
- _phone = dict_enterprise.get(_enterprise_name,{}).get("phone_number","")
- if len(_phone)==11 and _phone[0]=="1":
- df_data["中标单位联系人"][_i] = _person
- df_data["中标单位联系电话"][_i] = _phone
- else:
- if df_data["中标单位联系电话"][_i]=="":
- contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- if contacts is not None:
- _person,_phone = getOneContact(contacts,mobile_only=True)
- df_data["中标单位联系人"][_i] = _person
- df_data["中标单位联系电话"][_i] = _phone
- list_df_columns.extend(['信用代码','招标人采购系统','招标人类型'])
- def generateBoolShouldQuery(list_field,list_should_words,cls):
- list_should_q = []
- assert isinstance(list_field,(list))
- assert isinstance(list_should_words,(list))
- assert cls in (TermQuery,MatchPhraseQuery,WildcardQuery,RangeQuery)
- for word in list_should_words:
- for field in list_field:
- if cls in (RangeQuery,):
- list_should_q.append(cls(field,*field))
- if cls in (WildcardQuery,):
- list_should_q.append(cls(field,"*%s*"%word))
- else:
- list_should_q.append(cls(field,word))
- return BoolQuery(should_queries=list_should_q)
- # excel 数据处理库
- import openpyxl
- # excel 数据样式设置类
- from openpyxl.styles import Font, PatternFill, Border, Side, Alignment
- from openpyxl.styles import Border, Side, colors
- import pandas as pd
- from openpyxl.utils import get_column_letter
- from openpyxl.styles import Font, Alignment
- import os
- def my_border(t_border, b_border, l_border, r_border):
- border = Border(top=Side(border_style=t_border, color=colors.BLACK),
- bottom=Side(border_style=b_border, color=colors.BLACK),
- left=Side(border_style=l_border, color=colors.BLACK),
- right=Side(border_style=r_border, color=colors.BLACK))
- return border
- #初始化制定区域边框为所有框线
- def format_border(ws,s_column, s_index, e_column , e_index):
- for row in tuple(ws[s_column + str(s_index):e_column + str(e_index)]):
- for cell in row:
- cell.border = my_border('thin', 'thin', 'thin', 'thin')
- def adjust_excel(source,target,columns=["A","B","C","D"]):
- wb = openpyxl.load_workbook(source)
- for sheet in wb.sheetnames:
- ws = wb[sheet]
- df = pd.read_excel(source,sheet_name=sheet)
- # 把表头改到最后一行
- df.loc[len(df)]=list(df.columns)
- list_row_height = []
- for col in df.columns:
- # 获取列序号
- index = list(df.columns).index(col)
- # 获取行字母表头
- letter = get_column_letter(index+1)
- # 获取当前列最大宽度
- collen = df[col].apply(lambda x :len(str(x).encode())).max()
- # 设置列宽为最大长度比例
- _width = min(30,collen*0.9)
- ws.column_dimensions[letter].width = _width
- for i in df.index:
- # 设置单元格对齐方式 Alignment(horizontal=水平对齐模式,vertical=垂直对齐模式,text_rotation=旋转角度,wrap_text=是否自动换行)
- alignment = Alignment(horizontal='center', vertical='center', text_rotation=0, wrap_text=True)
- format_border(ws,columns[0], 0, columns[-1], len(df)) # 根据实际列数量修改
- for j in columns:
- ws[j+str(i+1)].alignment = alignment
- list_width = []
- for col in df.columns:
- # 获取列序号
- index = list(df.columns).index(col)
- # 获取行字母表头
- letter = get_column_letter(index+1)
- # 获取当前列最大宽度
- list_width.append(len(str(df[col][i]).encode()))
- ws.row_dimensions[index].height = 20+max(list_width)//30*5
- wb.save(target)
- def getDocument(list_query,columns,table_name="document",table_index="document_index",thread_count=30,sort_column="page_time"):
- task_queue = Queue()
- for _q in list_query:
- task_queue.put(_q)
- print("task_queue_size",task_queue.qsize())
- result_queue = Queue()
- list_row = []
- ots_client = getConnect_ots()
- def _handle(_dict,result_queue,ots_client):
- try:
- item = _dict.get("query")
- _limit = _dict.get("limit")
- keyword = _dict.get("keyword")
- l_rows = []
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(item,sort=Sort(sorters=[FieldSort(sort_column,SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- dict_row = getRow_ots(rows)
- if keyword is not None:
- for _row in dict_row:
- _row["keyword"] = keyword
- l_rows.extend(dict_row)
- log("total count:%d"%total_count)
- _count = len(dict_row)
- while next_token:
- if _limit and len(l_rows)>=_limit:
- break
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(item,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- dict_row = getRow_ots(rows)
- if keyword is not None:
- for _row in dict_row:
- _row["keyword"] = keyword
- l_rows.extend(dict_row)
- _count += len(dict_row)
- print("%d/%d"%(_count,total_count))
- result_queue.put(l_rows)
- except Exception as e:
- traceback.print_exc()
- mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count,ots_client=ots_client)
- mt.run()
- while 1:
- try:
- dict_row = result_queue.get(False)
- list_row.extend(dict_row)
- except Exception as e:
- break
- return list_row
- class ExportEntity():
- def __init__(self,table,table_index,list_query,columns,just_get_totol_count=False):
- self.table = table
- self.table_index = table_index
- self.ots_client = getConnect_ots()
- self.task_queue = Queue()
- self.result_queue = Queue()
- for _q in list_query:
- self.task_queue.put(_q)
- def _handle(self,item,task_queue):
- self.ots_client.search(self.table,self.table_index,
- SearchQuery())
- def export(self):
- pass
- class A():
- pass
- def test(a,bc={"a":2}):
- pass
- if __name__ == '__main__':
- # print(dir(test))
- # _sign = inspect.signature(test)
- # print(_sign.parameters)
- # for _o in _sign.parameters:
- # print(_sign.parameters[_o].default)
- a = A()
- print(str(a))
|