123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648 |
- import os
- import sys
- import tensorflow as tf
- import keras.backend as K
- from keras import Input
- import numpy as np
- np.set_printoptions(threshold=np.inf)
- from keras.engine.base_layer import Layer
- from tensorflow.python.ops.control_flow_ops import while_loop
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from click_captcha.utils import box_iou
- from click_captcha.post_process import yolo_head
- def contrastive_loss(y_true, y_pred):
- """Contrastive loss from Hadsell-et-al.'06
- http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
- """
- margin = 1
- square_pred = K.square(y_pred)
- margin_square = K.square(K.maximum(margin - y_pred, 0))
- return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
- def focal_loss(gamma=3., alpha=.5, only_tf=True):
- def focal_loss_fixed(y_true, y_pred):
- pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
- pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
- if only_tf:
- return - tf.reduce_sum(alpha * tf.pow(1. - pt_1, gamma) * tf.math.log(1e-07 + pt_1)) \
- - tf.reduce_sum((1 - alpha) * tf.pow(pt_0, gamma) * tf.math.log(1. - pt_0 + 1e-07))
- else:
- return - K.sum(alpha * K.pow(1. - pt_1, gamma) * K.log(K.epsilon()+pt_1)) \
- - K.sum((1 - alpha) * K.pow(pt_0, gamma) * K.log(1. - pt_0 + K.epsilon()))
- return focal_loss_fixed
- def l1_loss():
- def mae(y_true, y_pred):
- return tf.reduce_mean(tf.abs(y_pred-y_true)) * 100
- return mae
- def l2_loss():
- def mse(y_true, y_pred):
- return tf.reduce_mean(tf.square(y_true - y_pred))
- return mse
- def l2_focal_loss(threshold=0.2, ratio=1000, reverse=False):
- def mse(y_true, y_pred):
- if reverse:
- y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, 1/ratio*tf.abs(y_pred-y_true), 0.1*tf.abs(y_pred-y_true))
- else:
- y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, tf.abs(y_pred-y_true), ratio*tf.abs(y_pred-y_true))
- return tf.reduce_mean(tf.square(y_minus))
- return mse
- def l1_focal_loss(threshold=0.2):
- def mae(y_true, y_pred):
- y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, 0., tf.abs(y_pred-y_true))
- return tf.reduce_sum(tf.abs(y_minus))
- return mae
- def l3_loss():
- def l3_loss_fixed(y_true, y_pred):
- return tf.reduce_mean(tf.abs(tf.pow(y_pred-y_true, 3)))
- return l3_loss_fixed
- def yolo_loss(args, anchors, num_classes, ignore_thresh=.5, print_loss=False):
- """Return yolo_loss tensor
- Parameters
- ----------
- yolo_outputs: list of tensor, the output of yolo_body or tiny_yolo_body
- y_true: list of array, the output of preprocess_true_boxes
- anchors: array, shape=(N, 2), wh
- num_classes: integer
- ignore_thresh: float, the iou threshold whether to ignore object confidence loss
- Returns
- -------
- loss: tensor, shape=(1,)
- """
- from keras import backend as K
- # default setting
- num_layers = len(anchors)//3
- yolo_outputs = args[:num_layers]
- y_true = args[num_layers:]
- anchor_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]] if num_layers == 3 else [[3, 4, 5], [1, 2, 3]]
- input_shape = K.cast(K.shape(yolo_outputs[0])[1:3] * 32, K.dtype(y_true[0]))
- grid_shapes = [K.cast(K.shape(yolo_outputs[l])[1:3], K.dtype(y_true[0])) for l in range(num_layers)]
- loss = 0
- # batch size, tensor
- m = K.shape(yolo_outputs[0])[0]
- mf = K.cast(m, K.dtype(yolo_outputs[0]))
- for l in range(num_layers):
- object_mask = y_true[l][..., 4:5]
- true_class_probs = y_true[l][..., 5:]
- grid, raw_pred, pred_xy, pred_wh = yolo_head(yolo_outputs[l],
- anchors[anchor_mask[l]], num_classes, input_shape, calc_loss=True)
- pred_box = K.concatenate([pred_xy, pred_wh])
- # Darknet raw box to calculate loss.
- raw_true_xy = y_true[l][..., :2]*grid_shapes[l][::-1] - grid
- raw_true_wh = K.log(y_true[l][..., 2:4] / anchors[anchor_mask[l]] * input_shape[::-1])
- # avoid log(0)=-inf
- raw_true_wh = K.switch(object_mask, raw_true_wh, K.zeros_like(raw_true_wh))
- box_loss_scale = 2 - y_true[l][..., 2:3]*y_true[l][..., 3:4]
- # Find ignore mask, iterate over each of batch.
- ignore_mask = tf.TensorArray(K.dtype(y_true[0]), size=1, dynamic_size=True)
- object_mask_bool = K.cast(object_mask, 'bool')
- def loop_body(b, ignore_mask):
- true_box = tf.boolean_mask(y_true[l][b, ..., 0:4], object_mask_bool[b,...,0])
- iou = box_iou(pred_box[b], true_box)
- best_iou = K.max(iou, axis=-1)
- ignore_mask = ignore_mask.write(b, K.cast(best_iou<ignore_thresh, K.dtype(true_box)))
- return b+1, ignore_mask
- _, ignore_mask = while_loop(lambda b, *args: b < m, loop_body, [0, ignore_mask])
- ignore_mask = ignore_mask.stack()
- ignore_mask = K.expand_dims(ignore_mask, -1)
- # K.binary_crossentropy is helpful to avoid exp overflow.
- xy_loss = object_mask * box_loss_scale * K.binary_crossentropy(raw_true_xy, raw_pred[..., 0:2], from_logits=True)
- wh_loss = object_mask * box_loss_scale * 0.5 * K.square(raw_true_wh-raw_pred[..., 2:4])
- confidence_loss = object_mask * K.binary_crossentropy(object_mask, raw_pred[..., 4:5], from_logits=True) + \
- (1-object_mask) * K.binary_crossentropy(object_mask, raw_pred[..., 4:5], from_logits=True) * ignore_mask
- class_loss = object_mask * K.binary_crossentropy(true_class_probs, raw_pred[..., 5:], from_logits=True)
- xy_loss = K.sum(xy_loss) / mf
- wh_loss = K.sum(wh_loss) / mf
- confidence_loss = K.sum(confidence_loss) / mf
- class_loss = K.sum(class_loss) / mf
- loss += xy_loss * 10 + wh_loss * 10 + confidence_loss
- # if print_loss:
- # loss = tf.Print(loss, [loss, xy_loss, wh_loss, confidence_loss, class_loss, K.sum(ignore_mask)], message='loss: ')
- return loss
- def ctc_lambda_func(args):
- """
- 定义ctc损失函数
- 参数:y_pred:预测值,labels:标签,input_length:lstm tiemstep,label_length:标签长度
- """
- y_pred, labels, input_length, label_length = args
- # return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
- return my_ctc_batch_cost(labels, y_pred, input_length, label_length, mode=0)
- def my_ctc_batch_cost(y_true, y_pred, input_length, label_length, mode=0):
- """Runs CTC loss algorithm on each batch element.
- Args:
- y_true: tensor `(samples, max_string_length)`
- containing the truth labels.
- y_pred: tensor `(samples, time_steps, num_categories)`
- containing the prediction, or output of the softmax.
- input_length: tensor `(samples, 1)` containing the sequence length for
- each batch item in `y_pred`.
- label_length: tensor `(samples, 1)` containing the sequence length for
- each batch item in `y_true`.
- Returns:
- Tensor with shape (samples,1) containing the
- CTC loss of each element.
- """
- input_length = tf.cast(
- tf.squeeze(input_length, axis=-1), tf.int32)
- label_length = tf.cast(
- tf.squeeze(label_length, axis=-1), tf.int32)
- sparse_labels = tf.cast(
- K.ctc_label_dense_to_sparse(y_true, label_length), tf.int32)
- y_pred = tf.math.log(tf.compat.v1.transpose(y_pred, perm=[1, 0, 2]) + K.epsilon())
- loss = tf.compat.v1.nn.ctc_loss(inputs=y_pred,
- labels=sparse_labels,
- sequence_length=input_length,
- preprocess_collapse_repeated=False,
- ctc_merge_repeated=True)
- loss = tf.expand_dims(loss, 1)
- if mode == 1:
- loss = focal_ctc(sparse_labels, y_pred, input_length, loss)
- # if mode == 2:
- # loss = loss + ctc_decode_mse_loss((y_pred, y_true, input_length, label_length))
- # print("loss1", loss.shape)
- return loss
- # @tf.function
- def ctc_decode_mse_loss(args):
- num_classes = 35+2
- time_step = 11
- # y_pred [32, 21, 37]
- y_pred, labels, input_length, label_length = args
- # print("y_pred", y_pred.shape)
- # y_pred [37, 32, 21]
- # y_pred = tf.compat.v1.transpose(y_pred, perm=[2, 0, 1])
- # y_max [32, 21]
- y_max = tf.argmax(y_pred, axis=-1, name='raw_prediction')
- # 判断是否为预测的字符
- is_char = tf.greater(y_max, 0)
- # 错位比较法,找到重复字符
- char_rep = tf.equal(y_max[:, :-1], y_max[:, 1:])
- tail = tf.greater(y_max[:, :1], num_classes - 1)
- char_rep = tf.concat([char_rep, tail], axis=1)
- # 去掉重复字符之后的字符位置,重复字符取其 最后一次 出现的位置
- # [32, 21]
- char_no_rep = tf.math.logical_and(is_char, tf.math.logical_not(char_rep))
- # char_no_rep = tf.expand_dims(char_no_rep, axis=-1)
- # char_no_rep = tf.concat([char_no_rep]*37, axis=-1)
- # [32, 21, 37]
- # y_pred = tf.compat.v1.transpose(y_pred, perm=[1, 2, 0])
- # y_pred_no_rep [32*?, 37]
- # y_pred_no_rep = tf.boolean_mask(y_pred, char_no_rep)
- # y_pred_no_rep [32, ?, 37]
- # y_pred_no_rep = tf.compat.v1.transpose(y_pred_no_rep, perm=[1, 0, 2])
- # time_step = tf.cast(K.shape(y_pred_no_rep)[0]/K.shape(y_pred)[0], tf.int32)
- # y_pred_no_rep [32, 21, 37]
- # y_pred_no_rep = tf.reshape(y_pred_no_rep, (K.shape(y_pred)[0], time_step, K.shape(y_pred_no_rep)[-1]))
- # 填充两个张量的时间步维度到同一大小
- # y_pred_no_rep = tf.concat([y_pred_no_rep, tf.zeros((K.shape(labels)[0], K.shape(labels)[1], K.shape(y_pred)[2]-K.shape(labels)[2]))],
- # axis=2)
- # [32, 37, 21]
- labels = tf.cast(labels, tf.int32)
- labels = tf.one_hot(labels, depth=num_classes, axis=1, dtype=tf.float32)
- labels = tf.concat([labels, tf.zeros((K.shape(labels)[0], K.shape(labels)[1], K.shape(y_pred)[2]-K.shape(labels)[2]))],
- axis=2)
- # [32, 21, 37]
- labels = tf.compat.v1.transpose(labels, perm=[0, 2, 1])
- new_label = tf.zeros((1, time_step, num_classes), dtype=tf.float32)
- # tf.autograph.experimental.set_loop_options(
- # shape_invariants=[(new_label, tf.TensorShape([None, None, 37]))]
- # )
- @tf.function
- def body(_i, _label):
- # print("_i", _i)
- sample = char_no_rep[_i, :]
- if sample[0]:
- new_sample = labels[_i:_i+1, 0:1, :]
- new_sample = tf.cast(new_sample, tf.float32)
- else:
- new_sample = tf.zeros((1, 1, 37), dtype=tf.float32)
- for j in range(1, 11):
- step = char_no_rep[_i, j]
- k = 0
- if step and k < K.shape(labels)[1]:
- new_sample = tf.concat([new_sample, labels[_i:_i+1, k:k+1, :]], axis=1)
- k += 1
- else:
- new_sample = tf.concat([new_sample, tf.zeros((1, 1, 37), dtype=tf.float32)], axis=1)
- if _i == 0:
- _label = new_sample
- else:
- _label = tf.concat([_label, new_sample], axis=0)
- _i = tf.add(_i, 1)
- return _i, _label
- def cond(_i, _label):
- return tf.less(_i, K.shape(labels)[0])
- i = tf.constant(1, dtype=tf.int32)
- # time_step_tensor = tf.constant(time_step, dtype=tf.int32)
- # num_classes_tensor = tf.constant(num_classes, dtype=tf.int32)
- _, new_label = tf.while_loop(cond, body, [i, new_label],
- shape_invariants=[i.get_shape(), tf.TensorShape([None, None, 37]),])
- # print("new_label", new_label.shape)
- # for i in range(32):
- # sample = char_no_rep[i, :]
- # if sample[0]:
- # new_sample = labels[i:i+1, 0:1, :]
- # new_sample = tf.cast(new_sample, tf.float32)
- # else:
- # new_sample = tf.zeros((1, 1, 37), dtype=tf.float32)
- # for j in range(1, 21):
- # step = char_no_rep[i, j]
- # k = 0
- # if step and k < K.shape(labels)[1]:
- # new_sample = tf.concat([new_sample, labels[i:i+1, k:k+1, :]], axis=1)
- # k += 1
- # else:
- # new_sample = tf.concat([new_sample, tf.zeros((1, 1, 37), dtype=tf.float32)], axis=1)
- # # if i == 0:
- # # new_label = new_sample
- # # else:
- # new_label = tf.concat([new_label, new_sample], axis=0)
- # def cond(_i, _j):
- # return tf.less(_i, K.shape(char_no_rep)[-1])
- #
- # def body(_i, _j):
- # def func1(j):
- # tf.add(j, 1)
- # return tf.cast(labels[:, j-1], tf.int32)
- #
- # def func2():
- # return tf.zeros((K.shape(labels)[0], K.shape(labels)[0]-31), dtype=tf.int32)
- #
- # cond_func = tf.cond(char_no_rep[:, _i], lambda: func1(_j), func2)
- # return cond_func
- #
- # i = K.constant(1, tf.int32)
- # j = K.constant(1, tf.int32)
- # y_pred_no_rep, _ = tf.while_loop(cond, body, [i, j])
- # pred_sum = tf.reduce_sum(y_pred)
- # label_sum = tf.reduce_sum(raw_labels)
- # labels [32, 37, 21]
- # y_pred [32, 37, ]
- # new_label = tf.reshape(new_label, (None, 777))
- loss = tf.reduce_mean(tf.abs((new_label-y_pred)), axis=-1)
- loss = tf.reduce_mean(loss, axis=-1)
- loss = tf.expand_dims(loss, -1)
- # loss = tf.reduce_mean(loss, axis=-1)
- # print("loss2", loss.shape)
- # loss.set_shape(None, 1)
- # print("loss22", loss.shape)
- return loss
- def ctc_decode_mse_loss2(args):
- batch_size = 32
- num_classes = 35+2
- time_step = 21
- label_len = 8
- blank_index = num_classes-1
- # [32, 21, 37]
- y_pred, labels, input_length, label_length = args
- # [32, 21]
- y_max = tf.argmax(y_pred, axis=-1, name='raw_prediction', output_type=tf.int32)
- # [32, 8]
- labels = tf.cast(labels, tf.int32)
- # [batch, step]
- # new_label = tf.zeros((batch_size, time_step), dtype=tf.int32)
- new_label = tf.fill((batch_size, time_step), blank_index)
- @tf.function
- def body(_i, _label):
- # new_sample = tf.zeros((1, time_step), dtype=tf.int32)
- new_sample = tf.fill((1, time_step), blank_index)
- for j in range(0, label_len):
- # if tf.greater(0, y_max[_i, j]):
- find_flag = False
- for k in range(0, time_step):
- # 循环y_pred,找对应labels,会漏掉
- # if k < K.shape(labels)[1] and tf.equal(y_max[_i, j], labels[_i, k]):
- # # tf.print("equal", y_max[_i, j], labels[_i, k])
- # if j == 0:
- # new_sample = tf.concat([labels[_i:_i+1, k:k+1], new_sample[:, j+1:]], axis=-1)
- # elif j >= time_step-1:
- # new_sample = tf.concat([new_sample[:, :j], labels[_i:_i+1, k:k+1]], axis=-1)
- # else:
- # new_sample = tf.concat([new_sample[:, :j], labels[_i:_i+1, k:k+1], new_sample[:, j+1:]], axis=-1)
- # 循环labels,找对应y_pred,漏掉的找个0位置覆盖
- # tf.print("labels", labels[_i], last_k, j, labels[_i].shape, new_sample.shape)
- if tf.equal(y_max[_i, k], labels[_i, j]) and tf.not_equal(y_max[_i, k], blank_index):
- find_flag = True
- if k == 0:
- new_sample = tf.concat([labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1)
- elif k >= time_step-1:
- new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1]], axis=-1)
- else:
- new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1)
- # tf.print("new_sample", new_sample, last_k, j, K.shape(labels[_i]), K.shape(new_sample))
- if not find_flag and tf.not_equal(labels[_i, j], blank_index):
- find_flag2 = False
- for k in range(0, time_step):
- if not find_flag2 and tf.equal(new_sample[0, k], blank_index):
- find_flag2 = True
- if k == 0:
- new_sample = tf.concat([labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1)
- elif k >= time_step-1:
- new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1]], axis=-1)
- else:
- new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1)
- # tf.print("new_sample", new_sample, labels[_i, j], find_flag, find_flag2, summarize=100)
- # tf.print("new_sample", new_sample, summarize=100)
- tf.print("y_max[_i]", y_max[_i], summarize=100)
- tf.print("new_samele", new_sample, summarize=100)
- tf.print("labels[_i]", labels[_i], summarize=100)
- tf.print("loss", tf.reduce_mean(tf.abs((y_max[_i]-new_sample)), axis=-1))
- if _i == 0:
- _label = tf.concat([new_sample[:, :], _label[_i+1:, :]], axis=0)
- elif _i >= time_step-1:
- _label = tf.concat([_label[:_i, :], new_sample[:, :]], axis=0)
- else:
- _label = tf.concat([_label[:_i, :], new_sample[:, :], _label[_i+1:, :]], axis=0)
- _i = tf.add(_i, 1)
- return _i, _label
- def cond(_i, _label):
- return tf.less(_i, K.shape(labels)[0])
- i = tf.constant(1, dtype=tf.int32)
- _, new_label = tf.while_loop(cond, body, [i, new_label],
- shape_invariants=[i.get_shape(), tf.TensorShape([None, None])])
- new_label = tf.one_hot(new_label, depth=num_classes, axis=1, dtype=tf.float32)
- new_label = tf.compat.v1.transpose(new_label, perm=[0, 2, 1])
- # print("y_pred", y_pred.shape)
- # print("new_label", new_label.shape)
- loss = tf.reduce_mean(tf.abs((new_label-y_pred)), axis=-1)
- loss = tf.reduce_mean(loss*1, axis=-1)
- loss = tf.expand_dims(loss, -1)
- return loss
- class CtcDecodeMseLoss(Layer):
- def __init__(self, **kwargs):
- super(CtcDecodeMseLoss, self).__init__(**kwargs)
- def build(self, input_shape):
- # Create a trainable weight variable for this layer.
- super(CtcDecodeMseLoss, self).build(input_shape) # Be sure to call this somewhere!
- def call(self, inputs):
- # y_pred [32, 21, 37]
- y_pred, labels, input_length, label_length = inputs
- # y_max [32, 21]
- y_max = tf.argmax(y_pred, axis=-1, name='raw_prediction')
- num_classes = 35+2
- # 判断是否为预测的字符
- is_char = tf.greater(y_max, 0)
- # 错位比较法,找到重复字符
- char_rep = tf.equal(y_max[:, :-1], y_max[:, 1:])
- tail = tf.greater(y_max[:, :1], num_classes - 1)
- char_rep = tf.concat([char_rep, tail], axis=1)
- # 去掉重复字符之后的字符位置,重复字符取其 最后一次 出现的位置
- # [32, 21]
- char_no_rep = tf.math.logical_and(is_char, tf.math.logical_not(char_rep))
- # [32, 37, 21]
- labels = tf.cast(labels, tf.int32)
- labels = tf.one_hot(labels, depth=37, axis=1, dtype=tf.float32)
- labels = tf.concat([labels, tf.zeros((K.shape(labels)[0], K.shape(labels)[1], K.shape(y_pred)[2]-K.shape(labels)[2]))],
- axis=2)
- # [32, 21, 37]
- labels = tf.compat.v1.transpose(labels, perm=[0, 2, 1])
- for i in range(32):
- sample = char_no_rep[i, :]
- if sample[0]:
- new_sample = labels[i:i+1, 0:1, :]
- new_sample = tf.cast(new_sample, tf.float32)
- else:
- new_sample = tf.zeros((1, 1, 37), dtype=tf.float32)
- for j in range(1, 21):
- step = char_no_rep[i, j]
- k = 0
- if step and k < K.shape(labels)[1]:
- new_sample = tf.concat([new_sample, labels[i:i+1, k:k+1, :]], axis=1)
- k += 1
- else:
- new_sample = tf.concat([new_sample, tf.zeros((1, 1, 37), dtype=tf.float32)], axis=1)
- if i == 0:
- new_label = new_sample
- else:
- new_label = tf.concat([new_label, new_sample], axis=0)
- loss = tf.reduce_mean(tf.abs((new_label-y_pred)*100))
- # loss = tf.expand_dims(loss, 1)
- print("loss2", loss.shape)
- return loss
- def compute_output_shape(self, input_shape):
- return (K.shape(input_shape)[0], 1)
- def focal_ctc(targets, logits, seq_len, ctc_loss, alpha=0.8, gamma=2.0):
- # FOCAL LOSS
- # This function computes Focal Loss
- # Inputs: alpha, gamma, targets, logits, seq_len
- # Default Values: alpha=0.5 and gamma=2.0
- # Output: loss
- # ctc_loss = tf.compat.v1.nn.ctc_loss(labels=targets, inputs=logits, sequence_length=seq_len, time_major=True)
- p = tf.exp(-ctc_loss)
- # ((alpha)*((1-p)**gamma)*(ctc_loss))
- focal_ctc_loss = tf.multiply(tf.multiply(alpha, tf.pow((1-p), gamma)), ctc_loss)
- loss = tf.reduce_mean(focal_ctc_loss)
- return loss
- def ctc_center_loss(labels, features, _lambda=0.0005):
- def center_loss(labels, features, alpha=0.6, num_classes=240):
- """
- 获取center loss及更新样本的center
- :param labels: Tensor,表征样本label,非one-hot编码,shape应为(batch_size,).
- :param features: Tensor,表征样本特征,最后一个fc层的输出,shape应该为(batch_size, num_classes).
- :param alpha: 0-1之间的数字,控制样本类别中心的学习率,细节参考原文.
- :param num_classes: 整数,表明总共有多少个类别,网络分类输出有多少个神经元这里就取多少.
- :return: Tensor, center-loss, shape因为(batch_size,)
- """
- # 获取特征的维数,例如256维
- len_features = features.get_shape()[1]
- # 建立一个Variable,shape为[num_classes, len_features],用于存储整个网络的样本中心,
- # 设置trainable=False是因为样本中心不是由梯度进行更新的
- centers = tf.compat.v1.get_variable('centers', [num_classes, len_features], dtype=tf.float32,
- initializer=tf.constant_initializer(0), trainable=False)
- # 将label展开为一维的,如果labels已经是一维的,则该动作其实无必要
- labels = tf.reshape(labels, [-1])
- # 根据样本label,获取mini-batch中每一个样本对应的中心值
- centers_batch = tf.gather(centers, labels)
- # 当前mini-batch的特征值与它们对应的中心值之间的差
- diff = centers_batch - features
- # 获取mini-batch中同一类别样本出现的次数,了解原理请参考原文公式(4)
- unique_label, unique_idx, unique_count = tf.unique_with_counts(labels)
- appear_times = tf.gather(unique_count, unique_idx)
- appear_times = tf.reshape(appear_times, [-1, 1])
- diff = diff / tf.cast((1 + appear_times), tf.float32)
- diff = alpha * diff
- # 更新centers
- centers_update_op = tf.compat.v1.scatter_sub(centers, labels, diff)
- # 这里使用tf.control_dependencies更新centers
- with tf.control_dependencies([centers_update_op]):
- # 计算center-loss
- c_loss = tf.nn.l2_loss(features - centers_batch)
- return c_loss
- def get_slice(pos):
- feature_one_char = features[pos[1], pos[0], :]
- return feature_one_char
- num_classes = 35+2
- # 判断是否为预测的字符
- raw_pred = tf.argmax(features, axis=2, name='raw_prediction')
- is_char = tf.greater(raw_pred, 0)
- # 错位比较法,找到重复字符
- char_rep = tf.equal(raw_pred[:, :-1], raw_pred[:, 1:])
- tail = tf.greater(raw_pred[:, :1], num_classes - 1)
- char_rep = tf.concat([char_rep, tail], axis=1)
- # 去掉重复字符之后的字符位置,重复字符取其 最后一次 出现的位置
- char_no_rep = tf.math.logical_and(is_char, tf.math.logical_not(char_rep))
- char_pos = tf.boolean_mask(features, char_no_rep)
- features = tf.map_fn(get_slice, char_pos, dtype=tf.float32)
- labels = K.cast(labels, dtype=tf.float32)
- # softmax loss
- s_loss = K.categorical_crossentropy(labels, K.softmax(features, axis=-1))
- # center loss
- c_loss = center_loss(K.argmax(labels, axis=-1), features)
- return s_loss + _lambda * c_loss
- def ctc_center_accuracy(y_true, y_pred):
- """
- 重写categorical_accuracy函数,以适应去掉softmax层的模型
- :param y_true: 等同于labels,
- :param y_pred: 等同于features。
- :return: 准确率
- """
- # 计算y_pred的softmax值
- sm_y_pred = K.softmax(y_pred, axis=-1)
- # 返回准确率
- return K.cast(K.equal(K.argmax(y_true, axis=-1), K.argmax(sm_y_pred, axis=-1)), K.floatx())
- def ctc_accuracy(y_true, y_pred):
- # 使用CTC decoder
- decoded = K.ctc_decode(y_pred, input_length=21, greedy=False, beam_width=6)
- # 计算编辑距离
- distance = tf.edit_distance(tf.cast(decoded[0], tf.int32), y_true)
- # 计算label error rate (accuracy)
- label_error_rate = tf.reduce_mean(distance, name='label_error_rate')
- return label_error_rate
- def perceptual_loss(gamma=2., alpha=.25):
- from click_captcha.model import Vgg19
- def perceptual_loss_fixed(y_true, y_pred):
- if globals().get("vgg") is None:
- vgg = Vgg19("./vgg19.npy")
- globals().update({"vgg": vgg})
- print("init vgg19 success!")
- else:
- vgg = globals().get("vgg")
- # mask_1 = tf.where(y_true[:, :, :, 0] >= 0.75, 1, 0)
- # mask_2 = tf.where(y_true[:, :, :, 1] >= 0.75, 1, 0)
- # mask_3 = tf.where(y_true[:, :, :, 2] >= 0.75, 1, 0)
- # mask_white = tf.expand_dims(mask_1 * mask_2 * mask_3, -1)
- # mask_white = tf.concat([mask_white, mask_white, mask_white], -1)
- # y_true_mask = tf.where(mask_white == 1, 1., y_true)
- # y_pred_mask = tf.where(mask_white == 1, 1., y_pred)
- # print("y_pred.shape", y_pred.shape)
- y_pred = tf.concat([y_pred, y_pred, y_pred], -1)
- y_true = tf.concat([y_true, y_true, y_true], -1)
- vgg.build(y_true)
- vgg_true_1 = vgg.conv1_1
- vgg_true_2 = vgg.conv2_1
- vgg_true_3 = vgg.conv3_1
- vgg_true_4 = vgg.conv4_1
- vgg_true_5 = vgg.conv5_1
- vgg.build(y_pred)
- vgg_pred_1 = vgg.conv1_1
- vgg_pred_2 = vgg.conv2_1
- vgg_pred_3 = vgg.conv3_1
- vgg_pred_4 = vgg.conv4_1
- vgg_pred_5 = vgg.conv5_1
- loss_0 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(y_true, y_pred)
- loss_1 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_1, vgg_pred_1)
- loss_2 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_2, vgg_pred_2)
- loss_3 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_3, vgg_pred_3)
- loss_4 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_4, vgg_pred_4)
- loss_5 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_5, vgg_pred_5)
- return (loss_0+loss_1+loss_2+loss_3+loss_4+loss_5) / 6
- return perceptual_loss_fixed
|