interface.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import requests
  2. import json
  3. from urllib import request
  4. from requests.exceptions import ConnectionError
  5. def check_net(testserver):
  6. try:
  7. ret = request.urlopen(url=testserver, timeout=10.0)
  8. # print(ret)
  9. except:
  10. return False
  11. return True
  12. interface_url = "http://192.168.0.115:15010/convert"
  13. # interface_url = "http://192.168.2.102:15010/convert"
  14. # if not check_net(interface_url):
  15. # interface_url = "http://47.98.57.0:15015/convert"
  16. print(interface_url)
  17. DEFAULT_TIMEOUT = 3000
  18. import traceback
  19. import base64
  20. def getAttachDealInterface(_data,_type,path="",restry=1,kwargs={},url=interface_url,timeout=DEFAULT_TIMEOUT):
  21. _succeed = False
  22. _html = ""
  23. swf_images = []
  24. for i in range(restry):
  25. try:
  26. if path!="":
  27. _json = {"file_path":path,
  28. "type":_type}
  29. else:
  30. _json = {"file":_data,
  31. "type":_type}
  32. if len(kwargs.keys())>0:
  33. _json.update(kwargs)
  34. headers = {"Content-Type":"application/json"}
  35. _resp = requests.post(url,data=_json,timeout=timeout)
  36. if _resp.status_code==200:
  37. _result = json.loads(_resp.content.decode())
  38. _html = "".join(_result.get("result_html",""))
  39. swf_images = _result.get("swf_images",[])
  40. classification = _result.get("classification","")
  41. if _result["is_success"]==1:
  42. _succeed = True
  43. # print(_result)
  44. _html = "".join(_result["result_html"])
  45. return _succeed,_html,swf_images,classification
  46. else:
  47. pass
  48. except ConnectionError as e1:
  49. raise e1
  50. except Exception as e:
  51. traceback.print_exc()
  52. _succeed = False
  53. _html += "Exception:%s"%(str(e))
  54. return _succeed,_html,swf_images,""
  55. import time
  56. import hmac
  57. import hashlib
  58. import urllib.parse
  59. import requests
  60. ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d420c64fc7eb037561f40c837c934205b1b7751ae36151d885948c1a4a1d"
  61. ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
  62. def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
  63. timestamp = str(round(time.time() * 1000))
  64. secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
  65. secret_enc = secret.encode('utf-8')
  66. string_to_sign = '{}\n{}'.format(timestamp, secret)
  67. string_to_sign_enc = string_to_sign.encode('utf-8')
  68. hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
  69. sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
  70. # print(timestamp)
  71. # print(sign)
  72. #导入依赖库
  73. headers={'Content-Type': 'application/json'} #定义数据类型
  74. webhook = "%s&timestamp=%s&sign=%s"%(access_token,timestamp,sign)
  75. #定义要发送的数据
  76. #"at": {"atMobiles": "['"+ mobile + "']"
  77. data = {
  78. "msgtype": "text",
  79. "text": {"content": msg},
  80. "isAtAll": False,
  81. "at":{"isAtAll": atAll}
  82. }
  83. res = requests.post(webhook, data=json.dumps(data), headers=headers) #发送post请求
  84. # print(res.status_code)
  85. if __name__=="__main__":
  86. # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
  87. # sentMsgToDD("测试消息")
  88. sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)