import sys sys.path.append('../') import tensorflow as tf try: import tensorflow.python.keras as keras from tensorflow.python.keras import layers import tensorflow.python.keras.backend as K except: import tensorflow.keras as keras from tensorflow.keras import layers import tensorflow.keras.backend as K from typing import Optional from PIPE.config import Config from PIPE.keras_attention_layer import Attention_layer class Code2VecModel(): def __init__(self,config: Config): self.keras_train_model: Optional[keras.Model] = None self.config = config ##################################################搭建模型结构函数######################################################## def _create_keras_model(self): path_source_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32) path_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32) path_target_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32) context_valid_mask = layers.Input((self.config.MAX_CONTEXTS,)) # path embedding layer # (None, max_contents) -> (None,max_contents,path_embedding_size) paths_embedded = layers.Embedding( self.config.path_vocab_size, self.config.PATH_EMBEDDINGS_SIZE, name = 'path_embedding' )(path_input) # terminal embedding layer # (None, max_contents) -> (None,max_contents,token_embedding_size) token_embedding_shared_layer = layers.Embedding( self.config.token_vocab_size, self.config.TOKEN_EMBEDDINGS_SIZE, name = 'token_embedding' ) path_source_token_embedded = token_embedding_shared_layer(path_source_token_input) path_target_token_embedded = token_embedding_shared_layer(path_target_token_input) # concatenate layer: paths -> [source, path, target] # [3 * (None,max_contents, token_embedding_size)] -> (None, max_contents,3*embedding_size) context_embedded = layers.Concatenate()([path_source_token_embedded, paths_embedded, path_target_token_embedded]) context_embedded = layers.Dropout(1 - self.config.DROPOUT_KEEP_RATE)(context_embedded) # Dense layer: (None,max_contents,3*embedding_size) -> (None,max_contents, code_vector_size) context_after_dense = layers.TimeDistributed( layers.Dense(self.config.CODE_VECTOR_SIZE, use_bias=False, activation='tanh') )(context_embedded) # attention layer: (None, max_contents,code_vector_size) -> (None,code_vector_size) code_vectors, attention_weights = Attention_layer(name='attention')( [context_after_dense, context_valid_mask] ) """ 下面是用C2AE分类器进行分类的模型 """ Fx = layers.Dense( 186 , use_bias=True,activation=None,name='Fx' )(code_vectors) Fx_relu = tf.tanh(Fx) Fx_dropout = layers.Dropout(0.5)(Fx_relu) targets_input = layers.Input((self.config.categories,), dtype=tf.float32) targets_hidden = layers.Dense( 186, use_bias=True,activation=None,name='targets_hidden' )(targets_input) targets_hidden_relu = tf.tanh(targets_hidden) targets_hidden_dropout = layers.Dropout(0.5)(targets_hidden_relu) targets_output = layers.Dense( 186, use_bias=True,activation=None,name='targets_embedding' )(targets_hidden_dropout) targets_output_relu = tf.tanh(targets_output) targets_output_dropout = layers.Dropout(0.5)(targets_output_relu) targets_loss = layers.subtract([targets_output_dropout,Fx_dropout],name='targets_loss') Fd1 = layers.Dense( 186, use_bias=True,activation=None,name='Fd1' )(Fx_dropout) Fd_relu1 = tf.tanh(Fd1) Fd_dropout1 = layers.Dropout(0.5)(Fd_relu1) Fd = layers.Dense( 186, use_bias=True, activation=None, name='Fd' )(Fd_dropout1) Fd_relu = tf.tanh(Fd) Fd_dropout = layers.Dropout(0.5)(Fd_relu) target_index_3 = layers.Dense( self.config.categories, use_bias=True,activation='sigmoid',name='target_index_3' )(Fd_dropout) inputs = [path_source_token_input, path_input, path_target_token_input, context_valid_mask, targets_input] self.keras_train_model = keras.Model(inputs = inputs, outputs = [target_index_3,targets_loss]) print("------------------create_keras_model Done.-------------------------") @classmethod def _create_optimizer(self): return tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0) def _comile_keras_model(self, optimizer=None): if optimizer is None: optimizer = self._create_optimizer() ################################评估指标############################## def exactMatch(y_true, y_pred): y_pred1 = y_pred row_dif = K.cast(K.sum(K.round(K.clip(y_true * (1 - y_pred1) + (1-y_true) * y_pred1,0,1)), axis=1) > K.epsilon(),'float32') dif = K.sum(K.round(row_dif)) row_equ = K.cast(K.abs(K.sum(K.round(K.clip(y_true * y_pred1 + (1-y_true) * (1 - y_pred1),0,1)), axis=1) - self.config.categories) < K.epsilon(),'float32') equ = K.sum(K.round(row_equ)) return equ / (equ + dif + K.epsilon()) def micro_getPrecsion(y_true, y_pred): TP = tf.reduce_sum(y_true * tf.round(y_pred)) TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred))) FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred)) FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred))) precision = TP / (TP + FP) return precision def micro_getRecall(y_true, y_pred): TP = tf.reduce_sum(y_true * tf.round(y_pred)) TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred))) FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred)) FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred))) precision = TP / (TP + FP) recall = TP / (TP + FN) return recall # F1-score评价指标 def micro_F1score(y_true, y_pred): TP = tf.reduce_sum(y_true * tf.round(y_pred)) TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred))) FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred)) FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred))) precision = TP / (TP + FP) recall = TP / (TP + FN) F1score = 2 * precision * recall / (precision + recall) return F1score def macro_getPrecison(y_true, y_pred): col_TP = K.sum(y_true * K.round(y_pred), axis=0) col_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0) col_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0) col_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0) precsion = K.mean(col_TP / (col_TP + col_FP + K.epsilon())) return precsion def macro_getRecall(y_true, y_pred): # print(y_true) row_TP = K.sum(y_true * K.round(y_pred), axis=0) row_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0) row_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0) row_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0) recall = K.mean(row_TP / (row_TP + row_FN + K.epsilon())) return recall def macro_getF1score(y_true, y_pred): precision = macro_getPrecison(y_true, y_pred) recall = macro_getRecall(y_true, y_pred) F1score = 2 * precision * recall / (precision + recall) return F1score """ C2AE损失函数: custom_loss: 返回模型最后一层target_loss的平方和,这里y_true是随机设的 a_cross_loss: 返回输出的二分类交叉熵 """ def custom_loss(y_true,y_pred): return 1*tf.reduce_mean(tf.square(y_pred)) def a_cross_loss(y_true, y_pred): cross_loss = tf.add(tf.log(1e-10 + y_pred) * y_true, tf.log(1e-10 + (1 - y_pred)) * (1 - y_true)) cross_entropy_label = -1 * tf.reduce_mean(tf.reduce_sum(cross_loss, 1)) return 0.1*cross_entropy_label self.keras_train_model.compile( loss = {'target_index_3':a_cross_loss,'targets_loss':custom_loss}, optimizer=optimizer, metrics={'target_index_3':[exactMatch, micro_getPrecsion, micro_getRecall,micro_F1score,macro_getPrecison,macro_getRecall,macro_getF1score]} ) if __name__ == "__main__": config = Config(set_defaults=True, load_from_args=True, verify=False) model = Code2VecModel(config) model._create_keras_model()