#coding:UTF8 from utils.Utils import * from dataSource.source import getConnect_ots from utils.multiThread import MultiThreadHandler from queue import Queue import json from utils.hashUtil import aesCipher import inspect from tablestore import * set_columns = set() list_df_columns = [] from Crypto.Cipher import PKCS1_v1_5 as Cipher_pksc1_v1_5 from Crypto.PublicKey import RSA def rsa_encrpt(string, public_key): rsakey = RSA.importKey(public_key) # 读取公钥 cipher = Cipher_pksc1_v1_5.new(rsakey) # 因为encryptor.encrypt方法其内部就实现了加密再次Base64加密的过程,所以这里实际是通过下面的1和2完成了JSEncrypt的加密方法 encrypt_text = cipher.encrypt(string.encode()) # 1.对账号密码组成的字符串加密 return encrypt_text def getOneContact(contacts,tojson=True,mobile_first=True,mobile_only=True,high_level_first=True): mobile_person = "" mobile_no = '' phone_person = "" phone_no = '' if contacts is None: return "","" try: if isinstance(contacts,str): list_contacts = json.loads(contacts) else: list_contacts = contacts list_contacts.sort(key=lambda x:x.get("level",0),reverse=True) for _contact in list_contacts: _level = _contact.get("level") is_mobile = _contact.get("is_mobile",0) if is_mobile==1: _contact["mobile_no"] = _contact.get("phone_no") if _contact.get("mobile_no","")!="": mobile_person = _contact.get("contact_person","") mobile_no = _contact.get("mobile_no","") if _level==40: if mobile_person!="": mobile_person += "(法人)" if _contact.get("phone_no","")!="": phone_person = _contact.get("contact_person","") phone_no = _contact.get("phone_no","") if _level==40: if phone_person!="": phone_person += "(法人)" if mobile_first: if mobile_no!="" and mobile_person!="": return mobile_person,mobile_no else: if mobile_only: return mobile_person,mobile_no except Exception as e: pass return phone_person,phone_no def getMobiles(contacts,to_json=True): if to_json: list_contacts = json.loads(contacts) else: list_contacts = contacts list_result = [] for _c in list_contacts: if _c.get("mobile_no","")!="": list_result.append([_c.get("contact_person",""),_c.get("mobile_no")]) return list_result def set_dict_item(_dict,name,v): _dict[name] = getLegal_str(v) if name not in set_columns: set_columns.add(name) list_df_columns.append(getLegal_str(name)) def set_dict_item_columns(set_columns1,list_df_columns1,_dict,name,v): _dict[name] = getLegal_str(v) if name not in set_columns1: set_columns1.add(name) list_df_columns1.append(getLegal_str(name)) def getRowData_document(df_data,rows,set_line,list_keyword,dict_channel): # list_data = getRow_ots(rows) for row in rows: item = {} _dict = row set_dict_item(item,"docid",_dict.get("docid","")) set_dict_item(item,"公告标题",_dict.get("doctitle","")) set_dict_item(item,"公告内容",_dict.get("doctextcon","")) set_dict_item(item,"附件内容",_dict.get("attachmenttextcon","")) set_dict_item(item,"公告类别",dict_channel.get(_dict.get("docchannel",""),"")) set_dict_item(item,"关键词",",".join(list(set(re.findall("|".join([re.escape(str(a).replace("(","(").replace(")",")")) for a in list_keyword]),re.sub("\s","",str(row.get("doctitle","")+row.get("doctextcon","")+row.get("attachmenttextcon","")).replace("(","(").replace(")",")"))))))) set_dict_item(item,"产品",_dict.get("product","")) set_dict_item(item,"省份",_dict.get("province","")) # item["区域"] = "%s-%s-%s"%(_dict.get("province",""),_dict.get("city",""),_dict.get("district","")) set_dict_item(item,"城市",_dict.get("city","")) set_dict_item(item,"区县",_dict.get("district","")) set_dict_item(item,"发布时间",_dict.get("page_time","")) set_dict_item(item,"截标时间",_dict.get("time_bidclose","")) set_dict_item(item,"开标时间",_dict.get("time_bidopen","")) set_dict_item(item,"创建时间",_dict.get("crtime","")) set_dict_item(item,"招标方式",_dict.get("bidway","")) set_dict_item(item,"行业一级分类",_dict.get("industry","")) set_dict_item(item,"行业二级分类",_dict.get("info_type","")) set_dict_item(item,"uuid",_dict.get("uuid")) set_dict_item(item,"公告标题_refine",re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', _dict.get("doctitle",""))) set_dict_item(item,"项目编号",_dict.get("project_code","")) set_dict_item(item,"招标单位",_dict.get("tenderee","")) set_dict_item(item,"招标联系人",_dict.get("tenderee_contact","")) set_dict_item(item,"招标联系人电话",_dict.get("tenderee_phone","")) set_dict_item(item,"代理单位",_dict.get("agency","")) set_dict_item(item,"代理联系人",_dict.get("agency_contact","")) set_dict_item(item,"代理联系人电话",_dict.get("agency_phone","")) set_dict_item(item,"比地招标公告地址","http://www.bidizhaobiao.com/excel_detail.do?code=%s"%(str(aesCipher.encrypt('{"docid":%d}'%_dict.get("docid"))))) set_dict_item(item,"开标时间",_dict.get("time_bidopen","")) set_dict_item(item,"截标时间",_dict.get("time_bidclose","")) sub_docs_json = _dict.get("sub_docs_json") set_tenderer = set() if sub_docs_json is not None: for _doc in json.loads(sub_docs_json): if "win_tenderer" in _doc: set_dict_item(item,"中标单位",_doc["win_tenderer"]) if "second_tenderer" in _doc: set_tenderer.add(_doc.get("second_tenderer")) if "third_tenderer" in _doc: set_tenderer.add(_doc.get("third_tenderer")) if "win_tenderee_manager" in _doc: set_dict_item(item,"中标单位联系人",_doc["win_tenderee_manager"]) if "win_tenderee_phone" in _doc: set_dict_item(item,"中标单位联系电话",_doc["win_tenderee_phone"]) if "win_bid_price" in _doc and float(0 if _doc["win_bid_price"]=="" else _doc["win_bid_price"])>0: set_dict_item(item,"中标金额",_doc["win_bid_price"]) if "bidding_budget" in _doc and float(0 if _doc["bidding_budget"]=="" else _doc["bidding_budget"])>0: set_dict_item(item,"招标金额",_doc["bidding_budget"]) set_dict_item(item,"入围供应商",",".join(list(set_tenderer))) if "招标金额" not in item: set_dict_item(item,"招标金额","") if "中标金额" not in item: set_dict_item(item,"中标金额","") if "中标单位" not in item: set_dict_item(item,"中标单位","") if "中标单位联系人" not in item: set_dict_item(item,"中标单位联系人","") if "中标单位联系电话" not in item: set_dict_item(item,"中标单位联系电话","") # if item["中标单位"] not in set_enter: # continue _line = "%s-%s-%s-%s-%s-%s"%(item["省份"],item["城市"],item["项目编号"],item["招标单位"],item["中标单位"],str(item["中标金额"])) # if re.search("[大中小]学|幼儿园|医院|公司",item["招标单位"]) is not None: # continue # if _line in set_line: # continue # if _dict.get("docid","") in set_ig_docid: # continue # if item["招标金额"]=="": # continue # set_line.add(_line) for k,v in item.items(): if k not in df_data: df_data[k] = [] df_data[k].append(v) def getDictEnterprise(list_enterprise,columns_to_get = ["reg_capital","actual_capital","industry","estiblish_time","social_staff_num","zhong_biao_number","tou_biao_number","credit_code"]): task_queue = Queue() result_queue= Queue() for _enterprise in list_enterprise: task_queue.put(str(_enterprise)) def _handle(item,result_queue,ots_client): try: primary_key = [("name",item)] consumed,return_row,next_token = ots_client.get_row("enterprise",primary_key,columns_to_get,None,1) dict_data = getRow_ots_primary(return_row) if dict_data is not None: bool_q = BoolQuery(must_queries=[TermQuery("enterprise_name",item), TermQuery("status",1)]) rows,next_token,total_count,is_allowed = ots_client.search("enterprise_contact","enterprise_contact_index", SearchQuery(bool_q,limit=10), columns_to_get=ColumnsToGet(["contact_person","phone_no","is_mobile","level","is_legal_person","is_manager","is_shareholder"],ColumnReturnType.SPECIFIED)) list_contact = getRow_ots(rows) dict_data["contacts"] = list_contact result_queue.put({item:dict_data}) except Exception as e: traceback.print_exc() ots_client = getConnect_ots() mt = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=50,ots_client=ots_client) mt.run() dict_enterprise = {} while True: try: _dict = result_queue.get(False) for k,v in _dict.items(): dict_enterprise[k] = v except Exception as e: break return dict_enterprise def splitIntoList(_str,_splitkeys): list_words = [] for _word in re.split(_splitkeys,_str): if _word.strip()=="": continue list_words.append(_word) return list_words def fixContactPerson(df_data,list_df_columns,get_legal_person=False): set_enterprise = set() if len(df_data.keys())>0: for _tenderee,_agency,_win_tenderer in zip(df_data["招标单位"],df_data["代理单位"],df_data["中标单位"]): set_enterprise.add(_tenderee) set_enterprise.add(_agency) set_enterprise.add(_win_tenderer) if "" in set_enterprise: set_enterprise.remove("") if None in set_enterprise: set_enterprise.remove(None) dict_enterprise = getDictEnterprise(list(set_enterprise),columns_to_get = ["procurement_system","company_org_type","reg_capital","actual_capital","contacts","estiblish_time","social_staff_num","zhong_biao_number","tou_biao_number","credit_code","legal_person_name","phone_number"]) # conn = getConnection_oracle() # cursor = conn.cursor() if len(set_enterprise)>0: for _i in range(len(df_data["招标单位"])): _enterprise_name = df_data["招标单位"][_i] if df_data["招标联系人电话"][_i]=="": contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") if contacts is not None: _person,_phone = getOneContact(contacts) df_data["招标联系人"][_i] = _person df_data["招标联系人电话"][_i] = _phone if "信用代码" not in df_data: df_data["信用代码"] = [] df_data["信用代码"].append(dict_enterprise.get(_enterprise_name,{}).get("credit_code","")) if "招标人采购系统" not in df_data: df_data["招标人采购系统"] = [] df_data["招标人采购系统"].append(dict_enterprise.get(_enterprise_name,{}).get("procurement_system","")) if "招标人类型" not in df_data: df_data["招标人类型"] = [] df_data["招标人类型"].append(dict_enterprise.get(_enterprise_name,{}).get("company_org_type","")) # if "原网地址" not in df_data: # df_data["原网地址"] = [] # if df_data["公告类别"][_i]=="招标公告": # table_name = "bxkc.T_ZHAO_BIAO_GONG_GAO" # else: # table_name = "bxkc.T_ZHONG_BIAO_XIN_XI" # sql = "select detail_link from %s where id='%s' "%(table_name,df_data["uuid"][_i]) # cursor.execute(sql) # rows = cursor.fetchall() # if len(rows)>0: # df_data["原网地址"].append(rows[0][0]) # else: # df_data["原网地址"].append("") _enterprise_name = df_data["代理单位"][_i] if df_data["代理联系人电话"][_i]=="": contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") if contacts is not None: _person,_phone = getOneContact(contacts) df_data["代理联系人"][_i] = _person df_data["代理联系人电话"][_i] = _phone _enterprise_name = df_data["中标单位"][_i] if get_legal_person: _person = dict_enterprise.get(_enterprise_name,{}).get("legal_person_name","") _phone = dict_enterprise.get(_enterprise_name,{}).get("phone_number","") if len(_phone)==11 and _phone[0]=="1": df_data["中标单位联系人"][_i] = _person df_data["中标单位联系电话"][_i] = _phone else: if df_data["中标单位联系电话"][_i]=="": contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") if contacts is not None: _person,_phone = getOneContact(contacts,mobile_only=True) df_data["中标单位联系人"][_i] = _person df_data["中标单位联系电话"][_i] = _phone list_df_columns.extend(['信用代码','招标人采购系统','招标人类型']) def generateBoolShouldQuery(list_field,list_should_words,cls): list_should_q = [] assert isinstance(list_field,(list)) assert isinstance(list_should_words,(list)) assert cls in (TermQuery,MatchPhraseQuery,WildcardQuery,RangeQuery) for word in list_should_words: for field in list_field: if cls in (RangeQuery,): list_should_q.append(cls(field,*field)) if cls in (WildcardQuery,): list_should_q.append(cls(field,"*%s*"%word)) else: list_should_q.append(cls(field,word)) return BoolQuery(should_queries=list_should_q) # excel 数据处理库 import openpyxl # excel 数据样式设置类 from openpyxl.styles import Font, PatternFill, Border, Side, Alignment from openpyxl.styles import Border, Side, colors import pandas as pd from openpyxl.utils import get_column_letter from openpyxl.styles import Font, Alignment import os def my_border(t_border, b_border, l_border, r_border): border = Border(top=Side(border_style=t_border, color=colors.BLACK), bottom=Side(border_style=b_border, color=colors.BLACK), left=Side(border_style=l_border, color=colors.BLACK), right=Side(border_style=r_border, color=colors.BLACK)) return border #初始化制定区域边框为所有框线 def format_border(ws,s_column, s_index, e_column , e_index): for row in tuple(ws[s_column + str(s_index):e_column + str(e_index)]): for cell in row: cell.border = my_border('thin', 'thin', 'thin', 'thin') def adjust_excel(source,target,columns=["A","B","C","D"]): wb = openpyxl.load_workbook(source) for sheet in wb.sheetnames: ws = wb[sheet] df = pd.read_excel(source,sheet_name=sheet) # 把表头改到最后一行 df.loc[len(df)]=list(df.columns) list_row_height = [] for col in df.columns: # 获取列序号 index = list(df.columns).index(col) # 获取行字母表头 letter = get_column_letter(index+1) # 获取当前列最大宽度 collen = df[col].apply(lambda x :len(str(x).encode())).max() # 设置列宽为最大长度比例 _width = min(30,collen*0.9) ws.column_dimensions[letter].width = _width for i in df.index: # 设置单元格对齐方式 Alignment(horizontal=水平对齐模式,vertical=垂直对齐模式,text_rotation=旋转角度,wrap_text=是否自动换行) alignment = Alignment(horizontal='center', vertical='center', text_rotation=0, wrap_text=True) format_border(ws,columns[0], 0, columns[-1], len(df)) # 根据实际列数量修改 for j in columns: ws[j+str(i+1)].alignment = alignment list_width = [] for col in df.columns: # 获取列序号 index = list(df.columns).index(col) # 获取行字母表头 letter = get_column_letter(index+1) # 获取当前列最大宽度 list_width.append(len(str(df[col][i]).encode())) ws.row_dimensions[index].height = 20+max(list_width)//30*5 wb.save(target) def getDocument(list_query,columns,table_name="document",table_index="document_index",thread_count=30,sort_column="page_time"): task_queue = Queue() for _q in list_query: task_queue.put(_q) print("task_queue_size",task_queue.qsize()) result_queue = Queue() list_row = [] ots_client = getConnect_ots() def _handle(_dict,result_queue,ots_client): try: item = _dict.get("query") _limit = _dict.get("limit") keyword = _dict.get("keyword") l_rows = [] rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index, SearchQuery(item,sort=Sort(sorters=[FieldSort(sort_column,SortOrder.DESC)]),limit=100,get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) dict_row = getRow_ots(rows) if keyword is not None: for _row in dict_row: _row["keyword"] = keyword l_rows.extend(dict_row) log("total count:%d"%total_count) _count = len(dict_row) while next_token: if _limit and len(l_rows)>=_limit: break rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index, SearchQuery(item,next_token=next_token,limit=100,get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) dict_row = getRow_ots(rows) if keyword is not None: for _row in dict_row: _row["keyword"] = keyword l_rows.extend(dict_row) _count += len(dict_row) print("%d/%d"%(_count,total_count)) result_queue.put(l_rows) except Exception as e: traceback.print_exc() mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count,ots_client=ots_client) mt.run() while 1: try: dict_row = result_queue.get(False) list_row.extend(dict_row) except Exception as e: break return list_row class ExportEntity(): def __init__(self,table,table_index,list_query,columns,just_get_totol_count=False): self.table = table self.table_index = table_index self.ots_client = getConnect_ots() self.task_queue = Queue() self.result_queue = Queue() for _q in list_query: self.task_queue.put(_q) def _handle(self,item,task_queue): self.ots_client.search(self.table,self.table_index, SearchQuery()) def export(self): pass class A(): pass def test(a,bc={"a":2}): pass if __name__ == '__main__': # print(dir(test)) # _sign = inspect.signature(test) # print(_sign.parameters) # for _o in _sign.parameters: # print(_sign.parameters[_o].default) a = A() print(str(a))