1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import requests
- import json
- from urllib import request
- from requests.exceptions import ConnectionError
- def check_net(testserver):
- try:
- ret = request.urlopen(url=testserver, timeout=10.0)
- # print(ret)
- except:
- return False
- return True
- interface_url = "http://192.168.0.115:15010/convert"
- # interface_url = "http://192.168.2.102:15010/convert"
- # if not check_net(interface_url):
- # interface_url = "http://47.98.57.0:15015/convert"
- print(interface_url)
- DEFAULT_TIMEOUT = 3000
- import traceback
- import base64
- def getAttachDealInterface(_data,_type,path="",restry=1):
- _succeed = False
- _html = ""
- swf_images = []
- for i in range(restry):
- try:
- if path!="":
- _json = {"file_path":path,
- "type":_type}
- else:
- _json = {"file":_data,
- "type":_type}
- headers = {"Content-Type":"application/json"}
- _resp = requests.post(interface_url,data=_json,timeout=DEFAULT_TIMEOUT)
- if _resp.status_code==200:
- _result = json.loads(_resp.content.decode())
- _html = "".join(_result.get("result_html",""))
- swf_images = _result.get("swf_images",[])
- classification = _result.get("classification","")
- if _result["is_success"]==1:
- _succeed = True
- # print(_result)
- _html = "".join(_result["result_html"])
- return _succeed,_html,swf_images,classification
- else:
- pass
- except ConnectionError as e1:
- raise e1
- except Exception as e:
- traceback.print_exc()
- _succeed = False
- _html += "Exception:%s"%(str(e))
- return _succeed,_html,swf_images,""
- import time
- import hmac
- import hashlib
- import urllib.parse
- import requests
- ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d420c64fc7eb037561f40c837c934205b1b7751ae36151d885948c1a4a1d"
- ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
- def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
- timestamp = str(round(time.time() * 1000))
- secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
- secret_enc = secret.encode('utf-8')
- string_to_sign = '{}\n{}'.format(timestamp, secret)
- string_to_sign_enc = string_to_sign.encode('utf-8')
- hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
- sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
- # print(timestamp)
- # print(sign)
- #导入依赖库
- headers={'Content-Type': 'application/json'} #定义数据类型
- webhook = "%s×tamp=%s&sign=%s"%(access_token,timestamp,sign)
- #定义要发送的数据
- #"at": {"atMobiles": "['"+ mobile + "']"
- data = {
- "msgtype": "text",
- "text": {"content": msg},
- "isAtAll": False,
- "at":{"isAtAll": atAll}
- }
- res = requests.post(webhook, data=json.dumps(data), headers=headers) #发送post请求
- # print(res.status_code)
- if __name__=="__main__":
- # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
- # sentMsgToDD("测试消息")
- sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
|