|
@@ -1047,6 +1047,14 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
message_id = frame.headers["message-id"]
|
|
|
subscription = frame.headers.setdefault('subscription', None)
|
|
|
item = json.loads(frame.body)
|
|
|
+
|
|
|
+ for k,v in item.items():
|
|
|
+ try:
|
|
|
+ if isinstance(v,bytes):
|
|
|
+ item[k] = v.decode("utf-8")
|
|
|
+ except Exception as e:
|
|
|
+ log("docid %d types bytes can not decode"%(item.get("docid")))
|
|
|
+ item[k] = ""
|
|
|
dtmp = Document_tmp(item)
|
|
|
|
|
|
dhtml = Document_html({"partitionkey":item.get("partitionkey"),
|
|
@@ -1333,7 +1341,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
if len(_extract_ai.keys())>0:
|
|
|
_new_json,_changed = self.merge_json(_extract_json,_json)
|
|
|
if _changed:
|
|
|
- dtmp.setValue("extract_json_AI",json.dumps(_extract_ai,ensure_ascii=False))
|
|
|
+ dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
|
|
|
dtmp.setValue(document_tmp_dochtmlcon,"",False)
|
|
|
dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
|
|
|
dtmp.update_row(self.ots_client)
|