interface.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import requests
  2. import json
  3. from urllib import request
  4. from requests.exceptions import ConnectionError
  5. def check_net(testserver):
  6. try:
  7. ret = request.urlopen(url=testserver, timeout=10.0)
  8. # print(ret)
  9. except:
  10. return False
  11. return True
  12. interface_url = "http://192.168.0.115:15010/convert"
  13. # interface_url = "http://192.168.2.102:15010/convert"
  14. # if not check_net(interface_url):
  15. # interface_url = "http://47.98.57.0:15015/convert"
  16. print(interface_url)
  17. DEFAULT_TIMEOUT = 3000
  18. import traceback
  19. import base64
  20. def getAttachDealInterface(_data,_type,path="",restry=1):
  21. _succeed = False
  22. _html = ""
  23. swf_images = []
  24. for i in range(restry):
  25. try:
  26. if path!="":
  27. _json = {"file_path":path,
  28. "type":_type}
  29. else:
  30. _json = {"file":_data,
  31. "type":_type}
  32. headers = {"Content-Type":"application/json"}
  33. _resp = requests.post(interface_url,data=_json,timeout=DEFAULT_TIMEOUT)
  34. if _resp.status_code==200:
  35. _result = json.loads(_resp.content.decode())
  36. _html = "".join(_result.get("result_html",""))
  37. swf_images = _result.get("swf_images",[])
  38. classification = _result.get("classification","")
  39. if _result["is_success"]==1:
  40. _succeed = True
  41. # print(_result)
  42. _html = "".join(_result["result_html"])
  43. return _succeed,_html,swf_images,classification
  44. else:
  45. pass
  46. except ConnectionError as e1:
  47. raise e1
  48. except Exception as e:
  49. traceback.print_exc()
  50. _succeed = False
  51. _html += "Exception:%s"%(str(e))
  52. return _succeed,_html,swf_images,""
  53. import time
  54. import hmac
  55. import hashlib
  56. import urllib.parse
  57. import requests
  58. ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d420c64fc7eb037561f40c837c934205b1b7751ae36151d885948c1a4a1d"
  59. ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
  60. def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
  61. timestamp = str(round(time.time() * 1000))
  62. secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
  63. secret_enc = secret.encode('utf-8')
  64. string_to_sign = '{}\n{}'.format(timestamp, secret)
  65. string_to_sign_enc = string_to_sign.encode('utf-8')
  66. hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
  67. sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
  68. # print(timestamp)
  69. # print(sign)
  70. #导入依赖库
  71. headers={'Content-Type': 'application/json'} #定义数据类型
  72. webhook = "%s&timestamp=%s&sign=%s"%(access_token,timestamp,sign)
  73. #定义要发送的数据
  74. #"at": {"atMobiles": "['"+ mobile + "']"
  75. data = {
  76. "msgtype": "text",
  77. "text": {"content": msg},
  78. "isAtAll": False,
  79. "at":{"isAtAll": atAll}
  80. }
  81. res = requests.post(webhook, data=json.dumps(data), headers=headers) #发送post请求
  82. # print(res.status_code)
  83. if __name__=="__main__":
  84. # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
  85. # sentMsgToDD("测试消息")
  86. sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)