|
- '''
- Created on 2019年2月25日
- @author: User
- '''
- from keras import models
- from keras import layers
- from keras import optimizers, losses, metrics
- from keras.callbacks import ModelCheckpoint
- from module.Utils import *
- import keras.backend as K
- import tensorflow as tf
- import six
- import math
- def gelu(x):
- """Gaussian Error Linear Unit.
- This is a smoother version of the RELU.
- Original paper: https://arxiv.org/abs/1606.08415
- Args:
- x: float Tensor to perform activation.
- Returns:
- `x` with the GELU activation applied.
- """
- cdf = 0.5 * (1.0 + tf.tanh(
- (np.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3)))))
- return x * cdf
- def shape_list(x):
- """Return list of dims, statically where possible."""
- x = tf.convert_to_tensor(x)
- # If unknown rank, return dynamic shape
- if x.get_shape().dims is None:
- return tf.shape(x)
- static = x.get_shape().as_list()
- shape = tf.shape(x)
- ret = []
- for i in range(len(static)):
- dim = static[i]
- if dim is None:
- dim = shape[i]
- ret.append(dim)
- return ret
- def get_timing_signal_1d(length,
- channels,
- min_timescale=1.0,
- max_timescale=1.0e4,
- start_index=0):
- """Gets a bunch of sinusoids of different frequencies.
- Each channel of the input Tensor is incremented by a sinusoid of a different
- frequency and phase.
- This allows attention to learn to use absolute and relative positions.
- Timing signals should be added to some precursors of both the query and the
- memory inputs to attention.
- The use of relative position is possible because sin(x+y) and cos(x+y) can be
- expressed in terms of y, sin(x) and cos(x).
- In particular, we use a geometric sequence of timescales starting with
- min_timescale and ending with max_timescale. The number of different
- timescales is equal to channels / 2. For each timescale, we
- generate the two sinusoidal signals sin(timestep/timescale) and
- cos(timestep/timescale). All of these sinusoids are concatenated in
- the channels dimension.
- Args:
- length: scalar, length of timing signal sequence.
- channels: scalar, size of timing embeddings to create. The number of
- different timescales is equal to channels / 2.
- min_timescale: a float
- max_timescale: a float
- start_index: index of first position
- Returns:
- a Tensor of timing signals [1, length, channels]
- """
- position = tf.to_float(tf.range(length) + start_index)
- num_timescales = channels // 2
- log_timescale_increment = (
- math.log(float(max_timescale) / float(min_timescale)) /
- (tf.to_float(num_timescales) - 1))
- inv_timescales = min_timescale * tf.exp(
- tf.to_float(tf.range(num_timescales)) * -log_timescale_increment)
- scaled_time = tf.expand_dims(position, 1) * tf.expand_dims(inv_timescales, 0)
- signal = tf.concat([tf.sin(scaled_time), tf.cos(scaled_time)], axis=1)
- signal = tf.pad(signal, [[0, 0], [0, tf.mod(channels, 2)]])
- signal = tf.reshape(signal, [1, length, channels])
- return signal
- def add_timing_signal_1d(x,
- min_timescale=1.0,
- max_timescale=1.0e4,
- start_index=0):
- """Adds a bunch of sinusoids of different frequencies to a Tensor.
- Each channel of the input Tensor is incremented by a sinusoid of a different
- frequency and phase.
- This allows attention to learn to use absolute and relative positions.
- Timing signals should be added to some precursors of both the query and the
- memory inputs to attention.
- The use of relative position is possible because sin(x+y) and cos(x+y) can be
- experessed in terms of y, sin(x) and cos(x).
- In particular, we use a geometric sequence of timescales starting with
- min_timescale and ending with max_timescale. The number of different
- timescales is equal to channels / 2. For each timescale, we
- generate the two sinusoidal signals sin(timestep/timescale) and
- cos(timestep/timescale). All of these sinusoids are concatenated in
- the channels dimension.
- Args:
- x: a Tensor with shape [batch, length, channels]
- min_timescale: a float
- max_timescale: a float
- start_index: index of first position
- Returns:
- a Tensor the same shape as x.
- """
- length = shape_list(x)[1]
- channels = shape_list(x)[2]
- signal = get_timing_signal_1d(length, channels, min_timescale, max_timescale,
- start_index)
- return x + signal
- def get_activation(activation_string):
- """Maps a string to a Python function, e.g., "relu" => `tf.nn.relu`.
- Args:
- activation_string: String name of the activation function.
- Returns:
- A Python function corresponding to the activation function. If
- `activation_string` is None, empty, or "linear", this will return None.
- If `activation_string` is not a string, it will return `activation_string`.
- Raises:
- ValueError: The `activation_string` does not correspond to a known
- activation.
- """
- # We assume that anything that"s not a string is already an activation
- # function, so we just return it.
- if not isinstance(activation_string, six.string_types):
- return activation_string
- if not activation_string:
- return None
- act = activation_string.lower()
- if act == "linear":
- return None
- elif act == "relu":
- return tf.nn.relu
- elif act == "gelu":
- return gelu
- elif act == "tanh":
- return tf.tanh
- else:
- raise ValueError("Unsupported activation: %s" % act)
- def dropout(input_tensor, dropout_prob):
- """Perform dropout.
- Args:
- input_tensor: float Tensor.
- dropout_prob: Python float. The probability of dropping out a value (NOT of
- *keeping* a dimension as in `tf.nn.dropout`).
- Returns:
- A version of `input_tensor` with dropout applied.
- """
- if dropout_prob is None or dropout_prob == 0.0:
- return input_tensor
- output = tf.nn.dropout(input_tensor, 1.0 - dropout_prob)
- return output
- def layer_norm(input_tensor, name=None):
- """Run layer normalization on the last dimension of the tensor."""
- return tf.contrib.layers.layer_norm(
- inputs=input_tensor, begin_norm_axis=-1, begin_params_axis=-1, scope=name)
- def layer_norm_and_dropout(input_tensor, dropout_prob, name=None):
- """Runs layer normalization followed by dropout."""
- output_tensor = layer_norm(input_tensor, name)
- output_tensor = dropout(output_tensor, dropout_prob)
- return output_tensor
- def create_initializer(initializer_range=0.02):
- """Creates a `truncated_normal_initializer` with the given range."""
- return tf.truncated_normal_initializer(stddev=initializer_range)
- def reshape_to_matrix(input_tensor):
- """Reshapes a >= rank 2 tensor to a rank 2 tensor (i.e., a matrix)."""
- ndims = input_tensor.shape.ndims
- if ndims < 2:
- raise ValueError("Input tensor must have at least rank 2. Shape = %s" %
- (input_tensor.shape))
- if ndims == 2:
- return input_tensor
- width = input_tensor.shape[-1]
- output_tensor = tf.reshape(input_tensor, [-1, width])
- return output_tensor
- def reshape_from_matrix(output_tensor, orig_shape_list):
- """Reshapes a rank 2 tensor back to its original rank >= 2 tensor."""
- if len(orig_shape_list) == 2:
- return output_tensor
- output_shape = shape_list(output_tensor)
- orig_dims = orig_shape_list[0:-1]
- width = output_shape[-1]
- return tf.reshape(output_tensor, orig_dims + [width])
- def assert_rank(tensor, expected_rank, name=None):
- """Raises an exception if the tensor rank is not of the expected rank.
- Args:
- tensor: A tf.Tensor to check the rank of.
- expected_rank: Python integer or list of integers, expected rank.
- name: Optional name of the tensor for the error message.
- Raises:
- ValueError: If the expected shape doesn't match the actual shape.
- """
- if name is None:
- name = tensor.name
- expected_rank_dict = {}
- if isinstance(expected_rank, six.integer_types):
- expected_rank_dict[expected_rank] = True
- else:
- for x in expected_rank:
- expected_rank_dict[x] = True
- actual_rank = tensor.shape.ndims
- if actual_rank not in expected_rank_dict:
- scope_name = tf.get_variable_scope().name
- raise ValueError(
- "For the tensor `%s` in scope `%s`, the actual rank "
- "`%d` (shape = %s) is not equal to the expected rank `%s`" %
- (name, scope_name, actual_rank, str(tensor.shape), str(expected_rank)))
- def get_shape_list(tensor, expected_rank=None, name=None):
- """Returns a list of the shape of tensor, preferring static dimensions.
- Args:
- tensor: A tf.Tensor object to find the shape of.
- expected_rank: (optional) int. The expected rank of `tensor`. If this is
- specified and the `tensor` has a different rank, and exception will be
- thrown.
- name: Optional name of the tensor for the error message.
- Returns:
- A list of dimensions of the shape of tensor. All static dimensions will
- be returned as python integers, and dynamic dimensions will be returned
- as tf.Tensor scalars.
- """
- if name is None:
- name = tensor.name
- if expected_rank is not None:
- assert_rank(tensor, expected_rank, name)
- shape = tensor.shape.as_list()
- non_static_indexes = []
- for (index, dim) in enumerate(shape):
- if dim is None:
- non_static_indexes.append(index)
- if not non_static_indexes:
- return shape
- dyn_shape = tf.shape(tensor)
- for index in non_static_indexes:
- shape[index] = dyn_shape[index]
- return shape
- def attention_layer(from_tensor,
- to_tensor,
- attention_mask=None,
- num_attention_heads=1,
- size_per_head=10,
- query_act=None,
- key_act=None,
- value_act=None,
- attention_probs_dropout_prob=0.0,
- initializer_range=0.02,
- do_return_2d_tensor=False,
- batch_size=None,
- from_seq_length=None,
- to_seq_length=None):
- """Performs multi-headed attention from `from_tensor` to `to_tensor`.
- This is an implementation of multi-headed attention based on "Attention
- is all you Need". If `from_tensor` and `to_tensor` are the same, then
- this is self-attention. Each timestep in `from_tensor` attends to the
- corresponding sequence in `to_tensor`, and returns a fixed-with vector.
- This function first projects `from_tensor` into a "query" tensor and
- `to_tensor` into "key" and "value" tensors. These are (effectively) a list
- of tensors of length `num_attention_heads`, where each tensor is of shape
- [batch_size, seq_length, size_per_head].
- Then, the query and key tensors are dot-producted and scaled. These are
- softmaxed to obtain attention probabilities. The value tensors are then
- interpolated by these probabilities, then concatenated back to a single
- tensor and returned.
- In practice, the multi-headed attention are done with transposes and
- reshapes rather than actual separate tensors.
- Args:
- from_tensor: float Tensor of shape [batch_size, from_seq_length,
- from_width].
- to_tensor: float Tensor of shape [batch_size, to_seq_length, to_width].
- attention_mask: (optional) int32 Tensor of shape [batch_size,
- from_seq_length, to_seq_length]. The values should be 1 or 0. The
- attention scores will effectively be set to -infinity for any positions in
- the mask that are 0, and will be unchanged for positions that are 1.
- num_attention_heads: int. Number of attention heads.
- size_per_head: int. Size of each attention head.
- query_act: (optional) Activation function for the query transform.
- key_act: (optional) Activation function for the key transform.
- value_act: (optional) Activation function for the value transform.
- attention_probs_dropout_prob: (optional) float. Dropout probability of the
- attention probabilities.
- initializer_range: float. Range of the weight initializer.
- do_return_2d_tensor: bool. If True, the output will be of shape [batch_size
- * from_seq_length, num_attention_heads * size_per_head]. If False, the
- output will be of shape [batch_size, from_seq_length, num_attention_heads
- * size_per_head].
- batch_size: (Optional) int. If the input is 2D, this might be the batch size
- of the 3D version of the `from_tensor` and `to_tensor`.
- from_seq_length: (Optional) If the input is 2D, this might be the seq length
- of the 3D version of the `from_tensor`.
- to_seq_length: (Optional) If the input is 2D, this might be the seq length
- of the 3D version of the `to_tensor`.
- Returns:
- float Tensor of shape [batch_size, from_seq_length,
- num_attention_heads * size_per_head]. (If `do_return_2d_tensor` is
- true, this will be of shape [batch_size * from_seq_length,
- num_attention_heads * size_per_head]).
- Raises:
- ValueError: Any of the arguments or tensor shapes are invalid.
- """
- def transpose_for_scores(input_tensor, batch_size, num_attention_heads,
- seq_length, width):
- output_tensor = tf.reshape(
- input_tensor, [batch_size, seq_length, num_attention_heads, width])
- output_tensor = tf.transpose(output_tensor, [0, 2, 1, 3])
- return output_tensor
- from_shape = get_shape_list(from_tensor, expected_rank=[2, 3])
- to_shape = get_shape_list(to_tensor, expected_rank=[2, 3])
- if len(from_shape) != len(to_shape):
- raise ValueError(
- "The rank of `from_tensor` must match the rank of `to_tensor`.")
- if len(from_shape) == 3:
- batch_size = from_shape[0]
- from_seq_length = from_shape[1]
- to_seq_length = to_shape[1]
- elif len(from_shape) == 2:
- if (batch_size is None or from_seq_length is None or to_seq_length is None):
- raise ValueError(
- "When passing in rank 2 tensors to attention_layer, the values "
- "for `batch_size`, `from_seq_length`, and `to_seq_length` "
- "must all be specified.")
- # Scalar dimensions referenced here:
- # B = batch size (number of sequences)
- # F = `from_tensor` sequence length
- # T = `to_tensor` sequence length
- # N = `num_attention_heads`
- # H = `size_per_head`
- from_tensor_2d = reshape_to_matrix(from_tensor)
- to_tensor_2d = reshape_to_matrix(to_tensor)
- # `query_layer` = [B*F, N*H]
- '''
- query_matrix = tf.get_variable(name="query",shape=(shape_list(from_tensor_2d)[-1],num_attention_heads * size_per_head),initializer=create_initializer(initializer_range))
- query_layer = tf.matmul(from_tensor_2d,query_matrix)
- if query_act is not None:
- query_layer = query_act(query_layer)
-
- key_matrix = tf.get_variable(name="key",shape=(shape_list(from_tensor_2d)[-1],num_attention_heads * size_per_head),initializer=create_initializer(initializer_range))
- key_layer = tf.matmul(from_tensor_2d,key_matrix)
- if key_act is not None:
- key_layer =key_act(key_layer)
-
- value_matrix = tf.get_variable(name="value",shape=(shape_list(from_tensor_2d)[-1],num_attention_heads * size_per_head),initializer=create_initializer(initializer_range))
- value_layer = tf.matmul(from_tensor_2d,value_matrix)
- if value_act is not None:
- value_layer = value_act(value_layer)
-
- '''
- query_layer = tf.layers.dense(
- from_tensor_2d,
- num_attention_heads * size_per_head,
- activation=query_act,
- name="query",
- kernel_initializer=create_initializer(initializer_range))
- # `key_layer` = [B*T, N*H]
- key_layer = tf.layers.dense(
- to_tensor_2d,
- num_attention_heads * size_per_head,
- activation=key_act,
- name="key",
- kernel_initializer=create_initializer(initializer_range))
- # `value_layer` = [B*T, N*H]
- value_layer = tf.layers.dense(
- to_tensor_2d,
- num_attention_heads * size_per_head,
- activation=value_act,
- name="value",
- kernel_initializer=create_initializer(initializer_range))
- # `query_layer` = [B, N, F, H]
- query_layer = transpose_for_scores(query_layer, batch_size,
- num_attention_heads, from_seq_length,
- size_per_head)
- # `key_layer` = [B, N, T, H]
- key_layer = transpose_for_scores(key_layer, batch_size, num_attention_heads,
- to_seq_length, size_per_head)
- # Take the dot product between "query" and "key" to get the raw
- # attention scores.
- # `attention_scores` = [B, N, F, T]
- attention_scores = tf.matmul(query_layer, key_layer, transpose_b=True)
- attention_scores = tf.multiply(attention_scores,
- 1.0 / math.sqrt(float(size_per_head)))
- print(attention_scores)
- if attention_mask is not None:
- # `attention_mask` = [B, 1, F, T]
- attention_mask = tf.expand_dims(attention_mask, axis=[1])
- # Since attention_mask is 1.0 for positions we want to attend and 0.0 for
- # masked positions, this operation will create a tensor which is 0.0 for
- # positions we want to attend and -10000.0 for masked positions.
- adder = (1.0 - tf.cast(attention_mask, tf.float32)) * -10000.0
- # Since we are adding it to the raw scores before the softmax, this is
- # effectively the same as removing these entirely.
- attention_scores += adder
- # Normalize the attention scores to probabilities.
- # `attention_probs` = [B, N, F, T]
- # B = batch size (number of sequences)
- # F = `from_tensor` sequence length
- # T = `to_tensor` sequence length
- # N = `num_attention_heads`
- # H = `size_per_head`
- # attention_scores = tf.reshape(attention_scores,[batch_size,num_attention_heads,from_seq_length,to_seq_length])
- attention_probs = tf.nn.softmax(attention_scores)
- # This is actually dropping out entire tokens to attend to, which might
- # seem a bit unusual, but is taken from the original Transformer paper.
- attention_probs = dropout(attention_probs, attention_probs_dropout_prob)
- # `value_layer` = [B, T, N, H]
- value_layer = tf.reshape(
- value_layer,
- [batch_size, to_seq_length, num_attention_heads, size_per_head])
- # `value_layer` = [B, N, T, H]
- value_layer = tf.transpose(value_layer, [0, 2, 1, 3])
- # `context_layer` = [B, N, F, H]
- context_layer = tf.matmul(attention_probs, value_layer)
- # `context_layer` = [B, F, N, H]
- context_layer = tf.transpose(context_layer, [0, 2, 1, 3])
- if do_return_2d_tensor:
- # `context_layer` = [B*F, N*H]
- context_layer = tf.reshape(
- context_layer,
- [batch_size * from_seq_length, num_attention_heads * size_per_head])
- else:
- # `context_layer` = [B, F, N*H]
- context_layer = tf.reshape(
- context_layer,
- [batch_size, from_seq_length, num_attention_heads * size_per_head])
- return context_layer
- def transformer_model(input_tensor,
- attention_mask=None,
- hidden_size=128,
- num_hidden_layers=4,
- num_attention_heads=1,
- intermediate_size=256,
- intermediate_act_fn=gelu,
- hidden_dropout_prob=0.1,
- attention_probs_dropout_prob=0.1,
- initializer_range=0.02,
- do_return_all_layers=False):
- # input_tensor = add_timing_signal_1d(input_tensor)
- if hidden_size % num_attention_heads != 0:
- raise ValueError(
- "The hidden size (%d) is not a multiple of the number of attention "
- "heads (%d)" % (hidden_size, num_attention_heads))
- attention_head_size = int(hidden_size / num_attention_heads)
- input_shape = get_shape_list(input_tensor, expected_rank=3)
- batch_size = input_shape[0]
- seq_length = input_shape[1]
- input_width = input_shape[2]
- # The Transformer performs sum residuals on all layers so the input needs
- # to be the same as the hidden size.
- if input_width != hidden_size:
- raise ValueError("The width of the input tensor (%d) != hidden size (%d)" %
- (input_width, hidden_size))
- # We keep the representation as a 2D tensor to avoid re-shaping it back and
- # forth from a 3D tensor to a 2D tensor. Re-shapes are normally free on
- # the GPU/CPU but may not be free on the TPU, so we want to minimize them to
- # help the optimizer.
- prev_output = reshape_to_matrix(input_tensor)
- all_layer_outputs = []
- with tf.variable_scope("encoder", reuse=tf.AUTO_REUSE):
- for layer_idx in range(num_hidden_layers):
- with tf.variable_scope("layer_%d" % layer_idx, reuse=tf.AUTO_REUSE):
- layer_input = prev_output
- with tf.variable_scope("attention", reuse=tf.AUTO_REUSE):
- attention_heads = []
- with tf.variable_scope("self", reuse=tf.AUTO_REUSE):
- attention_head = attention_layer(
- from_tensor=layer_input,
- to_tensor=layer_input,
- attention_mask=attention_mask,
- num_attention_heads=num_attention_heads,
- size_per_head=attention_head_size,
- attention_probs_dropout_prob=attention_probs_dropout_prob,
- initializer_range=initializer_range,
- do_return_2d_tensor=True,
- batch_size=batch_size,
- from_seq_length=seq_length,
- to_seq_length=seq_length)
- attention_heads.append(attention_head)
- attention_output = None
- if len(attention_heads) == 1:
- attention_output = attention_heads[0]
- else:
- # In the case where we have other sequences, we just concatenate
- # them to the self-attention head before the projection.
- attention_output = tf.concat(attention_heads, axis=-1)
- # Run a linear projection of `hidden_size` then add a residual
- # with `layer_input`.
- with tf.variable_scope("output", reuse=tf.AUTO_REUSE):
- attention_output = tf.layers.dense(
- attention_output,
- hidden_size,
- kernel_initializer=create_initializer(initializer_range))
- attention_output = dropout(attention_output, hidden_dropout_prob)
- attention_output = layer_norm(attention_output + layer_input)
- # The activation is only applied to the "intermediate" hidden layer.
- with tf.variable_scope("intermediate", reuse=tf.AUTO_REUSE):
- intermediate_output = tf.layers.dense(
- attention_output,
- intermediate_size,
- activation=intermediate_act_fn,
- kernel_initializer=create_initializer(initializer_range))
- # Down-project back to `hidden_size` then add the residual.
- with tf.variable_scope("output", reuse=tf.AUTO_REUSE):
- layer_output = tf.layers.dense(
- intermediate_output,
- hidden_size,
- kernel_initializer=create_initializer(initializer_range))
- layer_output = dropout(layer_output, hidden_dropout_prob)
- layer_output = layer_norm(layer_output + attention_output)
- prev_output = layer_output
- all_layer_outputs.append(layer_output)
- if do_return_all_layers:
- final_outputs = []
- for layer_output in all_layer_outputs:
- final_output = reshape_from_matrix(layer_output, input_shape)
- final_outputs.append(final_output)
- return final_outputs
- else:
- final_output = reshape_from_matrix(prev_output, input_shape)
- return final_output
- class Attention(layers.Layer):
- def __init__(self, **kwargs):
- super(Attention, self).__init__(**kwargs)
- def build(self, input_shape):
- # W: (EMBED_SIZE, 1)
- # b: (MAX_TIMESTEPS, 1)
- # u: (MAX_TIMESTEPS, MAX_TIMESTEPS)
- print(input_shape)
- self.W = self.add_weight(name="W_{:s}".format(self.name),
- shape=(input_shape[-1], 1),
- initializer="uniform")
- self.b = self.add_weight(name="b_{:s}".format(self.name),
- shape=(input_shape[1], 1),
- initializer="uniform")
- super(Attention, self).build(input_shape)
- def call(self, x, mask=None):
- # input: (BATCH_SIZE, MAX_TIMESTEPS, EMBED_SIZE)
- # et: (BATCH_SIZE, MAX_TIMESTEPS)
- et = K.squeeze(K.tanh(K.dot(x, self.W) + self.b), axis=-1)
- # at: (BATCH_SIZE, MAX_TIMESTEPS)
- print("et", np.shape(et))
- # at = K.dot(et, self.u)
- # if mask is not None:
- # at *= K.cast(mask, K.floatx())
- # ot: (BATCH_SIZE, MAX_TIMESTEPS, EMBED_SIZE)
- et /= K.cast(K.sum(et, axis=1, keepdims=True) + K.epsilon(), K.floatx())
- print(np.shape(et))
- # atx = K.expand_dims(at, axis=-1)
- # atx1 = K.argmax(at,axis=-1)
- # et1 = K.one_hot(atx1,100)
- # at1 = (at * (et1 - 1)) * -1
- # atx2 = K.argmax(at1,axis=-1)
- # et2 = K.one_hot(atx2,100)
- # at2 = (at1 * (et2 - 1)) * -1
- # atx3 = K.argmax(at2,axis=-1)
- # et3 = K.one_hot(atx3,100)
- # at3 = (at2 * (et3 - 1)) * -1
- # atx4 = K.argmax(at3,axis=-1)
- # et4 = K.one_hot(atx4,100)
- # at4 = (at3 * (et4 - 1)) * -1
- # atx5 = K.argmax(at4,axis=-1)
- # et5 = K.one_hot(atx5,100)
- # at5 = (at4 * (et5 - 1)) * -1
- # atx6 = K.argmax(at5,axis=-1)
- # et6 = K.one_hot(atx6,100)
- # et = et1 + et2 + et3 + et4 + et5 + et6
- # at = at * et
- # for i in range(at.shape[0]):
- # at[i][atx1[i]] = 0
- # atx2 = K.argmax(at,axis=-1)
- # for i in range(at.shape[0]):
- # at[i][atx2[i]] = 0
- # atx3 = K.argmax(at,axis=-1)
- # ad = K.zeros([at.shape[0],at.shape[1]])
- # at = at * ad
- # atx = K.expand_dims(at, axis=-1)
- return et
- def compute_mask(self, input, input_mask=None):
- # do not pass the mask to the next layers
- return None
- def compute_output_shape(self, input_shape):
- # output shape: (BATCH_SIZE, EMBED_SIZE)
- return (input_shape[0], input_shape[1])
- def get_config(self):
- return super(Attention, self).get_config()
- def getBiRNNModel(input_shape=[None, 36], out_len=2, TRANSFORMER=False):
- '''
- @summary:获取模型
- '''
- input = layers.Input(shape=input_shape, dtype="float32")
- # mask = layers.Masking(mask_value=0)(input)
- mask = input
- '''
- whole_lstm = layers.Bidirectional(layers.LSTM(12,return_sequences=False))(mask)
-
- repeat = layers.RepeatVector(input_shape[0])(whole_lstm)
-
- #lstm_0 = layers.Bidirectional(layers.LSTM(12,return_sequences=True))(mask)
-
- #lstm_1 = layers.Bidirectional(layers.LSTM(48,return_sequences=True))(lstm_0)
-
- matrix = layers.Dense(24,activation="relu")(mask)
-
- concat = layers.merge([repeat,matrix],mode="concat")
-
-
- matrix = layers.Dense(48,activation="relu")(concat)
- matrix = layers.Dense(24,activation="relu")(matrix)
- #output = layers.Dense(out_len,activation="softmax")(matrix)
- output = Attention()(concat)
-
- print("out",np.shape(output))
-
- #layers.RepeatVector(np.shape(matrix)[-2])(whole_lstm)
- '''
- ''''''
- if TRANSFORMER:
- set_v_before = set([v.name for v in tf.trainable_variables()])
- transformer_layer = layers.Lambda(
- lambda x: transformer_model(x, hidden_size=get_shape_list(mask)[-1], do_return_all_layers=False),
- trainable=True)
- globalLocalFeature = transformer_layer(mask)
- transformer_weights = []
- for v in tf.trainable_variables():
- if v.name not in set_v_before:
- transformer_weights.append(v)
- transformer_layer._trainable_weights = transformer_weights
- else:
- lstm_0 = layers.Bidirectional(layers.LSTM(32, return_sequences=True))(mask)
- # matrix = layers.Dense(24,activation="relu")(lstm_0)
- lstm_1 = layers.Bidirectional(layers.LSTM(12, return_sequences=True))(lstm_0)
- globalLocalFeature = lstm_1
- # output = layers.Lambda(lambda x:)
- output = layers.Dense(2, activation="softmax")(globalLocalFeature)
- # output = layers.Lambda(lambda x:K.clip(K.softmax(K.squeeze(x,2)),1e-12,1))(output)
- model = models.Model(inputs=[input], outputs=output)
- model.compile(optimizer=optimizers.Adam(lr=0.001), loss=my_loss, metrics=[precision, recall, f1_score])
- model.summary()
- return model
- if __name__ == "__main__":
- getBiRNNModel(TRANSFORMER=False)
|