|
|
- import sys
- sys.path.append('../')
- import tensorflow as tf
- try:
- import tensorflow.python.keras as keras
- from tensorflow.python.keras import layers
- import tensorflow.python.keras.backend as K
- except:
- import tensorflow.keras as keras
- from tensorflow.keras import layers
- import tensorflow.keras.backend as K
- from typing import Optional
- from PIPE.config import Config
- from PIPE.keras_attention_layer import Attention_layer
-
-
- class Code2VecModel():
- def __init__(self,config: Config):
- self.keras_train_model: Optional[keras.Model] = None
- self.config = config
-
- ##################################################搭建模型结构函数########################################################
- def _create_keras_model(self):
- path_source_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
- path_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
- path_target_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
- context_valid_mask = layers.Input((self.config.MAX_CONTEXTS,))
-
- # path embedding layer
- # (None, max_contents) -> (None,max_contents,path_embedding_size)
- paths_embedded = layers.Embedding(
- self.config.path_vocab_size, self.config.PATH_EMBEDDINGS_SIZE, name = 'path_embedding'
- )(path_input)
-
- # terminal embedding layer
- # (None, max_contents) -> (None,max_contents,token_embedding_size)
- token_embedding_shared_layer = layers.Embedding(
- self.config.token_vocab_size, self.config.TOKEN_EMBEDDINGS_SIZE, name = 'token_embedding'
- )
-
- path_source_token_embedded = token_embedding_shared_layer(path_source_token_input)
- path_target_token_embedded = token_embedding_shared_layer(path_target_token_input)
-
- # concatenate layer: paths -> [source, path, target]
- # [3 * (None,max_contents, token_embedding_size)] -> (None, max_contents,3*embedding_size)
- context_embedded = layers.Concatenate()([path_source_token_embedded, paths_embedded, path_target_token_embedded])
- context_embedded = layers.Dropout(1 - self.config.DROPOUT_KEEP_RATE)(context_embedded)
-
- # Dense layer: (None,max_contents,3*embedding_size) -> (None,max_contents, code_vector_size)
- context_after_dense = layers.TimeDistributed(
- layers.Dense(self.config.CODE_VECTOR_SIZE, use_bias=False, activation='tanh')
- )(context_embedded)
-
- # attention layer: (None, max_contents,code_vector_size) -> (None,code_vector_size)
- code_vectors, attention_weights = Attention_layer(name='attention')(
- [context_after_dense, context_valid_mask]
- )
-
- """
- 下面是用C2AE分类器进行分类的模型
- """
- Fx = layers.Dense(
- 186 , use_bias=True,activation=None,name='Fx'
- )(code_vectors)
-
- Fx_relu = tf.tanh(Fx)
- Fx_dropout = layers.Dropout(0.5)(Fx_relu)
-
- targets_input = layers.Input((self.config.categories,), dtype=tf.float32)
- targets_hidden = layers.Dense(
- 186, use_bias=True,activation=None,name='targets_hidden'
- )(targets_input)
- targets_hidden_relu = tf.tanh(targets_hidden)
-
- targets_hidden_dropout = layers.Dropout(0.5)(targets_hidden_relu)
- targets_output = layers.Dense(
- 186, use_bias=True,activation=None,name='targets_embedding'
- )(targets_hidden_dropout)
- targets_output_relu = tf.tanh(targets_output)
-
- targets_output_dropout = layers.Dropout(0.5)(targets_output_relu)
-
- targets_loss = layers.subtract([targets_output_dropout,Fx_dropout],name='targets_loss')
- Fd1 = layers.Dense(
- 186, use_bias=True,activation=None,name='Fd1'
- )(Fx_dropout)
-
- Fd_relu1 = tf.tanh(Fd1)
-
- Fd_dropout1 = layers.Dropout(0.5)(Fd_relu1)
-
- Fd = layers.Dense(
- 186, use_bias=True, activation=None, name='Fd'
- )(Fd_dropout1)
-
- Fd_relu = tf.tanh(Fd)
-
- Fd_dropout = layers.Dropout(0.5)(Fd_relu)
-
- target_index_3 = layers.Dense(
- self.config.categories, use_bias=True,activation='sigmoid',name='target_index_3'
- )(Fd_dropout)
-
- inputs = [path_source_token_input, path_input, path_target_token_input, context_valid_mask, targets_input]
- self.keras_train_model = keras.Model(inputs = inputs, outputs = [target_index_3,targets_loss])
-
- print("------------------create_keras_model Done.-------------------------")
-
- @classmethod
-
- def _create_optimizer(self):
- return tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
-
- def _comile_keras_model(self, optimizer=None):
- if optimizer is None:
- optimizer = self._create_optimizer()
-
- ################################评估指标##############################
- def exactMatch(y_true, y_pred):
- y_pred1 = y_pred
- row_dif = K.cast(K.sum(K.round(K.clip(y_true * (1 - y_pred1) + (1-y_true) * y_pred1,0,1)), axis=1) > K.epsilon(),'float32')
- dif = K.sum(K.round(row_dif))
- row_equ = K.cast(K.abs(K.sum(K.round(K.clip(y_true * y_pred1 + (1-y_true) * (1 - y_pred1),0,1)), axis=1) - self.config.categories) < K.epsilon(),'float32')
- equ = K.sum(K.round(row_equ))
- return equ / (equ + dif + K.epsilon())
-
- def micro_getPrecsion(y_true, y_pred):
- TP = tf.reduce_sum(y_true * tf.round(y_pred))
- TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
- FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
- FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
- precision = TP / (TP + FP)
- return precision
-
- def micro_getRecall(y_true, y_pred):
- TP = tf.reduce_sum(y_true * tf.round(y_pred))
- TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
- FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
- FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
- precision = TP / (TP + FP)
- recall = TP / (TP + FN)
- return recall
-
- # F1-score评价指标
- def micro_F1score(y_true, y_pred):
- TP = tf.reduce_sum(y_true * tf.round(y_pred))
- TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
- FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
- FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
- precision = TP / (TP + FP)
- recall = TP / (TP + FN)
- F1score = 2 * precision * recall / (precision + recall)
- return F1score
-
- def macro_getPrecison(y_true, y_pred):
- col_TP = K.sum(y_true * K.round(y_pred), axis=0)
- col_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
- col_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
- col_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
- precsion = K.mean(col_TP / (col_TP + col_FP + K.epsilon()))
- return precsion
-
- def macro_getRecall(y_true, y_pred):
- # print(y_true)
- row_TP = K.sum(y_true * K.round(y_pred), axis=0)
- row_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
- row_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
- row_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
- recall = K.mean(row_TP / (row_TP + row_FN + K.epsilon()))
- return recall
-
- def macro_getF1score(y_true, y_pred):
- precision = macro_getPrecison(y_true, y_pred)
- recall = macro_getRecall(y_true, y_pred)
- F1score = 2 * precision * recall / (precision + recall)
- return F1score
-
- """
- C2AE损失函数:
- custom_loss:
- 返回模型最后一层target_loss的平方和,这里y_true是随机设的
- a_cross_loss:
- 返回输出的二分类交叉熵
- """
- def custom_loss(y_true,y_pred):
- return 1*tf.reduce_mean(tf.square(y_pred))
-
- def a_cross_loss(y_true, y_pred):
- cross_loss = tf.add(tf.log(1e-10 + y_pred) * y_true, tf.log(1e-10 + (1 - y_pred)) * (1 - y_true))
- cross_entropy_label = -1 * tf.reduce_mean(tf.reduce_sum(cross_loss, 1))
- return 0.1*cross_entropy_label
-
- self.keras_train_model.compile(
- loss = {'target_index_3':a_cross_loss,'targets_loss':custom_loss},
- optimizer=optimizer,
- metrics={'target_index_3':[exactMatch, micro_getPrecsion, micro_getRecall,micro_F1score,macro_getPrecison,macro_getRecall,macro_getF1score]}
- )
-
- if __name__ == "__main__":
- config = Config(set_defaults=True, load_from_args=True, verify=False)
- model = Code2VecModel(config)
- model._create_keras_model()
-
|