import os import sys import tensorflow as tf import keras.backend as K from keras import Input import numpy as np np.set_printoptions(threshold=np.inf) from keras.engine.base_layer import Layer from tensorflow.python.ops.control_flow_ops import while_loop sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") sys.path.append(os.path.dirname(os.path.abspath(__file__))) from click_captcha.utils import box_iou from click_captcha.post_process import yolo_head def contrastive_loss(y_true, y_pred): """Contrastive loss from Hadsell-et-al.'06 http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf """ margin = 1 square_pred = K.square(y_pred) margin_square = K.square(K.maximum(margin - y_pred, 0)) return K.mean(y_true * square_pred + (1 - y_true) * margin_square) def focal_loss(gamma=3., alpha=.5, only_tf=True): def focal_loss_fixed(y_true, y_pred): pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred)) pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred)) if only_tf: return - tf.reduce_sum(alpha * tf.pow(1. - pt_1, gamma) * tf.math.log(1e-07 + pt_1)) \ - tf.reduce_sum((1 - alpha) * tf.pow(pt_0, gamma) * tf.math.log(1. - pt_0 + 1e-07)) else: return - K.sum(alpha * K.pow(1. - pt_1, gamma) * K.log(K.epsilon()+pt_1)) \ - K.sum((1 - alpha) * K.pow(pt_0, gamma) * K.log(1. - pt_0 + K.epsilon())) return focal_loss_fixed def l1_loss(): def mae(y_true, y_pred): return tf.reduce_mean(tf.abs(y_pred-y_true)) * 100 return mae def l2_loss(): def mse(y_true, y_pred): return tf.reduce_mean(tf.square(y_true - y_pred)) return mse def l2_focal_loss(threshold=0.2, ratio=1000, reverse=False): def mse(y_true, y_pred): if reverse: y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, 1/ratio*tf.abs(y_pred-y_true), 0.1*tf.abs(y_pred-y_true)) else: y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, tf.abs(y_pred-y_true), ratio*tf.abs(y_pred-y_true)) return tf.reduce_mean(tf.square(y_minus)) return mse def l1_focal_loss(threshold=0.2): def mae(y_true, y_pred): y_minus = tf.where(tf.abs(y_pred-y_true) <= threshold, 0., tf.abs(y_pred-y_true)) return tf.reduce_sum(tf.abs(y_minus)) return mae def l3_loss(): def l3_loss_fixed(y_true, y_pred): return tf.reduce_mean(tf.abs(tf.pow(y_pred-y_true, 3))) return l3_loss_fixed def yolo_loss(args, anchors, num_classes, ignore_thresh=.5, print_loss=False): """Return yolo_loss tensor Parameters ---------- yolo_outputs: list of tensor, the output of yolo_body or tiny_yolo_body y_true: list of array, the output of preprocess_true_boxes anchors: array, shape=(N, 2), wh num_classes: integer ignore_thresh: float, the iou threshold whether to ignore object confidence loss Returns ------- loss: tensor, shape=(1,) """ from keras import backend as K # default setting num_layers = len(anchors)//3 yolo_outputs = args[:num_layers] y_true = args[num_layers:] anchor_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]] if num_layers == 3 else [[3, 4, 5], [1, 2, 3]] input_shape = K.cast(K.shape(yolo_outputs[0])[1:3] * 32, K.dtype(y_true[0])) grid_shapes = [K.cast(K.shape(yolo_outputs[l])[1:3], K.dtype(y_true[0])) for l in range(num_layers)] loss = 0 # batch size, tensor m = K.shape(yolo_outputs[0])[0] mf = K.cast(m, K.dtype(yolo_outputs[0])) for l in range(num_layers): object_mask = y_true[l][..., 4:5] true_class_probs = y_true[l][..., 5:] grid, raw_pred, pred_xy, pred_wh = yolo_head(yolo_outputs[l], anchors[anchor_mask[l]], num_classes, input_shape, calc_loss=True) pred_box = K.concatenate([pred_xy, pred_wh]) # Darknet raw box to calculate loss. raw_true_xy = y_true[l][..., :2]*grid_shapes[l][::-1] - grid raw_true_wh = K.log(y_true[l][..., 2:4] / anchors[anchor_mask[l]] * input_shape[::-1]) # avoid log(0)=-inf raw_true_wh = K.switch(object_mask, raw_true_wh, K.zeros_like(raw_true_wh)) box_loss_scale = 2 - y_true[l][..., 2:3]*y_true[l][..., 3:4] # Find ignore mask, iterate over each of batch. ignore_mask = tf.TensorArray(K.dtype(y_true[0]), size=1, dynamic_size=True) object_mask_bool = K.cast(object_mask, 'bool') def loop_body(b, ignore_mask): true_box = tf.boolean_mask(y_true[l][b, ..., 0:4], object_mask_bool[b,...,0]) iou = box_iou(pred_box[b], true_box) best_iou = K.max(iou, axis=-1) ignore_mask = ignore_mask.write(b, K.cast(best_iou= time_step-1: # new_sample = tf.concat([new_sample[:, :j], labels[_i:_i+1, k:k+1]], axis=-1) # else: # new_sample = tf.concat([new_sample[:, :j], labels[_i:_i+1, k:k+1], new_sample[:, j+1:]], axis=-1) # 循环labels,找对应y_pred,漏掉的找个0位置覆盖 # tf.print("labels", labels[_i], last_k, j, labels[_i].shape, new_sample.shape) if tf.equal(y_max[_i, k], labels[_i, j]) and tf.not_equal(y_max[_i, k], blank_index): find_flag = True if k == 0: new_sample = tf.concat([labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1) elif k >= time_step-1: new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1]], axis=-1) else: new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1) # tf.print("new_sample", new_sample, last_k, j, K.shape(labels[_i]), K.shape(new_sample)) if not find_flag and tf.not_equal(labels[_i, j], blank_index): find_flag2 = False for k in range(0, time_step): if not find_flag2 and tf.equal(new_sample[0, k], blank_index): find_flag2 = True if k == 0: new_sample = tf.concat([labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1) elif k >= time_step-1: new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1]], axis=-1) else: new_sample = tf.concat([new_sample[:, :k], labels[_i:_i+1, j:j+1], new_sample[:, k+1:]], axis=-1) # tf.print("new_sample", new_sample, labels[_i, j], find_flag, find_flag2, summarize=100) # tf.print("new_sample", new_sample, summarize=100) tf.print("y_max[_i]", y_max[_i], summarize=100) tf.print("new_samele", new_sample, summarize=100) tf.print("labels[_i]", labels[_i], summarize=100) tf.print("loss", tf.reduce_mean(tf.abs((y_max[_i]-new_sample)), axis=-1)) if _i == 0: _label = tf.concat([new_sample[:, :], _label[_i+1:, :]], axis=0) elif _i >= time_step-1: _label = tf.concat([_label[:_i, :], new_sample[:, :]], axis=0) else: _label = tf.concat([_label[:_i, :], new_sample[:, :], _label[_i+1:, :]], axis=0) _i = tf.add(_i, 1) return _i, _label def cond(_i, _label): return tf.less(_i, K.shape(labels)[0]) i = tf.constant(1, dtype=tf.int32) _, new_label = tf.while_loop(cond, body, [i, new_label], shape_invariants=[i.get_shape(), tf.TensorShape([None, None])]) new_label = tf.one_hot(new_label, depth=num_classes, axis=1, dtype=tf.float32) new_label = tf.compat.v1.transpose(new_label, perm=[0, 2, 1]) # print("y_pred", y_pred.shape) # print("new_label", new_label.shape) loss = tf.reduce_mean(tf.abs((new_label-y_pred)), axis=-1) loss = tf.reduce_mean(loss*1, axis=-1) loss = tf.expand_dims(loss, -1) return loss class CtcDecodeMseLoss(Layer): def __init__(self, **kwargs): super(CtcDecodeMseLoss, self).__init__(**kwargs) def build(self, input_shape): # Create a trainable weight variable for this layer. super(CtcDecodeMseLoss, self).build(input_shape) # Be sure to call this somewhere! def call(self, inputs): # y_pred [32, 21, 37] y_pred, labels, input_length, label_length = inputs # y_max [32, 21] y_max = tf.argmax(y_pred, axis=-1, name='raw_prediction') num_classes = 35+2 # 判断是否为预测的字符 is_char = tf.greater(y_max, 0) # 错位比较法,找到重复字符 char_rep = tf.equal(y_max[:, :-1], y_max[:, 1:]) tail = tf.greater(y_max[:, :1], num_classes - 1) char_rep = tf.concat([char_rep, tail], axis=1) # 去掉重复字符之后的字符位置,重复字符取其 最后一次 出现的位置 # [32, 21] char_no_rep = tf.math.logical_and(is_char, tf.math.logical_not(char_rep)) # [32, 37, 21] labels = tf.cast(labels, tf.int32) labels = tf.one_hot(labels, depth=37, axis=1, dtype=tf.float32) labels = tf.concat([labels, tf.zeros((K.shape(labels)[0], K.shape(labels)[1], K.shape(y_pred)[2]-K.shape(labels)[2]))], axis=2) # [32, 21, 37] labels = tf.compat.v1.transpose(labels, perm=[0, 2, 1]) for i in range(32): sample = char_no_rep[i, :] if sample[0]: new_sample = labels[i:i+1, 0:1, :] new_sample = tf.cast(new_sample, tf.float32) else: new_sample = tf.zeros((1, 1, 37), dtype=tf.float32) for j in range(1, 21): step = char_no_rep[i, j] k = 0 if step and k < K.shape(labels)[1]: new_sample = tf.concat([new_sample, labels[i:i+1, k:k+1, :]], axis=1) k += 1 else: new_sample = tf.concat([new_sample, tf.zeros((1, 1, 37), dtype=tf.float32)], axis=1) if i == 0: new_label = new_sample else: new_label = tf.concat([new_label, new_sample], axis=0) loss = tf.reduce_mean(tf.abs((new_label-y_pred)*100)) # loss = tf.expand_dims(loss, 1) print("loss2", loss.shape) return loss def compute_output_shape(self, input_shape): return (K.shape(input_shape)[0], 1) def focal_ctc(targets, logits, seq_len, ctc_loss, alpha=0.8, gamma=2.0): # FOCAL LOSS # This function computes Focal Loss # Inputs: alpha, gamma, targets, logits, seq_len # Default Values: alpha=0.5 and gamma=2.0 # Output: loss # ctc_loss = tf.compat.v1.nn.ctc_loss(labels=targets, inputs=logits, sequence_length=seq_len, time_major=True) p = tf.exp(-ctc_loss) # ((alpha)*((1-p)**gamma)*(ctc_loss)) focal_ctc_loss = tf.multiply(tf.multiply(alpha, tf.pow((1-p), gamma)), ctc_loss) loss = tf.reduce_mean(focal_ctc_loss) return loss def ctc_center_loss(labels, features, _lambda=0.0005): def center_loss(labels, features, alpha=0.6, num_classes=240): """ 获取center loss及更新样本的center :param labels: Tensor,表征样本label,非one-hot编码,shape应为(batch_size,). :param features: Tensor,表征样本特征,最后一个fc层的输出,shape应该为(batch_size, num_classes). :param alpha: 0-1之间的数字,控制样本类别中心的学习率,细节参考原文. :param num_classes: 整数,表明总共有多少个类别,网络分类输出有多少个神经元这里就取多少. :return: Tensor, center-loss, shape因为(batch_size,) """ # 获取特征的维数,例如256维 len_features = features.get_shape()[1] # 建立一个Variable,shape为[num_classes, len_features],用于存储整个网络的样本中心, # 设置trainable=False是因为样本中心不是由梯度进行更新的 centers = tf.compat.v1.get_variable('centers', [num_classes, len_features], dtype=tf.float32, initializer=tf.constant_initializer(0), trainable=False) # 将label展开为一维的,如果labels已经是一维的,则该动作其实无必要 labels = tf.reshape(labels, [-1]) # 根据样本label,获取mini-batch中每一个样本对应的中心值 centers_batch = tf.gather(centers, labels) # 当前mini-batch的特征值与它们对应的中心值之间的差 diff = centers_batch - features # 获取mini-batch中同一类别样本出现的次数,了解原理请参考原文公式(4) unique_label, unique_idx, unique_count = tf.unique_with_counts(labels) appear_times = tf.gather(unique_count, unique_idx) appear_times = tf.reshape(appear_times, [-1, 1]) diff = diff / tf.cast((1 + appear_times), tf.float32) diff = alpha * diff # 更新centers centers_update_op = tf.compat.v1.scatter_sub(centers, labels, diff) # 这里使用tf.control_dependencies更新centers with tf.control_dependencies([centers_update_op]): # 计算center-loss c_loss = tf.nn.l2_loss(features - centers_batch) return c_loss def get_slice(pos): feature_one_char = features[pos[1], pos[0], :] return feature_one_char num_classes = 35+2 # 判断是否为预测的字符 raw_pred = tf.argmax(features, axis=2, name='raw_prediction') is_char = tf.greater(raw_pred, 0) # 错位比较法,找到重复字符 char_rep = tf.equal(raw_pred[:, :-1], raw_pred[:, 1:]) tail = tf.greater(raw_pred[:, :1], num_classes - 1) char_rep = tf.concat([char_rep, tail], axis=1) # 去掉重复字符之后的字符位置,重复字符取其 最后一次 出现的位置 char_no_rep = tf.math.logical_and(is_char, tf.math.logical_not(char_rep)) char_pos = tf.boolean_mask(features, char_no_rep) features = tf.map_fn(get_slice, char_pos, dtype=tf.float32) labels = K.cast(labels, dtype=tf.float32) # softmax loss s_loss = K.categorical_crossentropy(labels, K.softmax(features, axis=-1)) # center loss c_loss = center_loss(K.argmax(labels, axis=-1), features) return s_loss + _lambda * c_loss def ctc_center_accuracy(y_true, y_pred): """ 重写categorical_accuracy函数,以适应去掉softmax层的模型 :param y_true: 等同于labels, :param y_pred: 等同于features。 :return: 准确率 """ # 计算y_pred的softmax值 sm_y_pred = K.softmax(y_pred, axis=-1) # 返回准确率 return K.cast(K.equal(K.argmax(y_true, axis=-1), K.argmax(sm_y_pred, axis=-1)), K.floatx()) def ctc_accuracy(y_true, y_pred): # 使用CTC decoder decoded = K.ctc_decode(y_pred, input_length=21, greedy=False, beam_width=6) # 计算编辑距离 distance = tf.edit_distance(tf.cast(decoded[0], tf.int32), y_true) # 计算label error rate (accuracy) label_error_rate = tf.reduce_mean(distance, name='label_error_rate') return label_error_rate def perceptual_loss(gamma=2., alpha=.25): from click_captcha.model import Vgg19 def perceptual_loss_fixed(y_true, y_pred): if globals().get("vgg") is None: vgg = Vgg19("./vgg19.npy") globals().update({"vgg": vgg}) print("init vgg19 success!") else: vgg = globals().get("vgg") # mask_1 = tf.where(y_true[:, :, :, 0] >= 0.75, 1, 0) # mask_2 = tf.where(y_true[:, :, :, 1] >= 0.75, 1, 0) # mask_3 = tf.where(y_true[:, :, :, 2] >= 0.75, 1, 0) # mask_white = tf.expand_dims(mask_1 * mask_2 * mask_3, -1) # mask_white = tf.concat([mask_white, mask_white, mask_white], -1) # y_true_mask = tf.where(mask_white == 1, 1., y_true) # y_pred_mask = tf.where(mask_white == 1, 1., y_pred) # print("y_pred.shape", y_pred.shape) y_pred = tf.concat([y_pred, y_pred, y_pred], -1) y_true = tf.concat([y_true, y_true, y_true], -1) vgg.build(y_true) vgg_true_1 = vgg.conv1_1 vgg_true_2 = vgg.conv2_1 vgg_true_3 = vgg.conv3_1 vgg_true_4 = vgg.conv4_1 vgg_true_5 = vgg.conv5_1 vgg.build(y_pred) vgg_pred_1 = vgg.conv1_1 vgg_pred_2 = vgg.conv2_1 vgg_pred_3 = vgg.conv3_1 vgg_pred_4 = vgg.conv4_1 vgg_pred_5 = vgg.conv5_1 loss_0 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(y_true, y_pred) loss_1 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_1, vgg_pred_1) loss_2 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_2, vgg_pred_2) loss_3 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_3, vgg_pred_3) loss_4 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_4, vgg_pred_4) loss_5 = l2_focal_loss(threshold=0.2, ratio=1000, reverse=True)(vgg_true_5, vgg_pred_5) return (loss_0+loss_1+loss_2+loss_3+loss_4+loss_5) / 6 return perceptual_loss_fixed