interface.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import requests
  2. import json
  3. from urllib import request
  4. from requests.exceptions import ConnectionError
  5. def check_net(testserver):
  6. try:
  7. ret = request.urlopen(url=testserver, timeout=10.0)
  8. # print(ret)
  9. except:
  10. return False
  11. return True
  12. interface_url = "http://192.168.0.115:15010/convert"
  13. # interface_url = "http://192.168.2.102:15010/convert"
  14. # if not check_net(interface_url):
  15. # interface_url = "http://47.98.57.0:15015/convert"
  16. print(interface_url)
  17. DEFAULT_TIMEOUT = 3000
  18. import traceback
  19. import base64
  20. from BaseDataMaintenance.common.Utils import log
  21. def getAttachDealInterface(_data,_type,path="",restry=1,kwargs={},url=interface_url,timeout=DEFAULT_TIMEOUT,session=None):
  22. _succeed = False
  23. _html = ""
  24. swf_images = []
  25. for i in range(restry):
  26. try:
  27. if path!="":
  28. _json = {"file_path":path,
  29. "type":_type}
  30. else:
  31. _json = {"file":_data,
  32. "type":_type}
  33. if len(kwargs.keys())>0:
  34. _json.update(kwargs)
  35. _json["timeout"] = timeout
  36. with requests.Session() as sess:
  37. _resp = sess.post(url,data=_json,timeout=timeout+100)
  38. if _resp.status_code==200:
  39. _result = json.loads(_resp.content.decode())
  40. _html = "".join(_result.get("result_html",""))
  41. swf_images = _result.get("swf_images",[])
  42. classification = _result.get("classification","")
  43. if _result["is_success"]==1:
  44. _succeed = True
  45. # print(_result)
  46. _html = "".join(_result["result_html"])
  47. return _succeed,_html,swf_images,classification
  48. else:
  49. pass
  50. except ConnectionError as e1:
  51. raise e1
  52. except Exception as e:
  53. traceback.print_exc()
  54. _succeed = False
  55. _html += "Exception:%s"%(str(e))
  56. return _succeed,_html,swf_images,""
  57. import time
  58. import hmac
  59. import hashlib
  60. import urllib.parse
  61. import requests
  62. ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d420c64fc7eb037561f40c837c934205b1b7751ae36151d885948c1a4a1d"
  63. ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
  64. def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
  65. timestamp = str(round(time.time() * 1000))
  66. secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
  67. secret_enc = secret.encode('utf-8')
  68. string_to_sign = '{}\n{}'.format(timestamp, secret)
  69. string_to_sign_enc = string_to_sign.encode('utf-8')
  70. hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
  71. sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
  72. # print(timestamp)
  73. # print(sign)
  74. #导入依赖库
  75. headers={'Content-Type': 'application/json'} #定义数据类型
  76. webhook = "%s&timestamp=%s&sign=%s"%(access_token,timestamp,sign)
  77. #定义要发送的数据
  78. #"at": {"atMobiles": "['"+ mobile + "']"
  79. data = {
  80. "msgtype": "text",
  81. "text": {"content": msg},
  82. "isAtAll": False,
  83. "at":{"isAtAll": atAll}
  84. }
  85. res = requests.post(webhook, data=json.dumps(data), headers=headers) #发送post请求
  86. # print(res.status_code)
  87. if __name__=="__main__":
  88. # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
  89. # sentMsgToDD("测试消息")
  90. sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)