Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 

203 строки
8.9 KiB

import sys
sys.path.append('../')
import tensorflow as tf
try:
import tensorflow.python.keras as keras
from tensorflow.python.keras import layers
import tensorflow.python.keras.backend as K
except:
import tensorflow.keras as keras
from tensorflow.keras import layers
import tensorflow.keras.backend as K
from typing import Optional
from PIPE.config import Config
from PIPE.keras_attention_layer import Attention_layer
class Code2VecModel():
def __init__(self,config: Config):
self.keras_train_model: Optional[keras.Model] = None
self.config = config
##################################################搭建模型结构函数########################################################
def _create_keras_model(self):
path_source_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
path_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
path_target_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
context_valid_mask = layers.Input((self.config.MAX_CONTEXTS,))
# path embedding layer
# (None, max_contents) -> (None,max_contents,path_embedding_size)
paths_embedded = layers.Embedding(
self.config.path_vocab_size, self.config.PATH_EMBEDDINGS_SIZE, name = 'path_embedding'
)(path_input)
# terminal embedding layer
# (None, max_contents) -> (None,max_contents,token_embedding_size)
token_embedding_shared_layer = layers.Embedding(
self.config.token_vocab_size, self.config.TOKEN_EMBEDDINGS_SIZE, name = 'token_embedding'
)
path_source_token_embedded = token_embedding_shared_layer(path_source_token_input)
path_target_token_embedded = token_embedding_shared_layer(path_target_token_input)
# concatenate layer: paths -> [source, path, target]
# [3 * (None,max_contents, token_embedding_size)] -> (None, max_contents,3*embedding_size)
context_embedded = layers.Concatenate()([path_source_token_embedded, paths_embedded, path_target_token_embedded])
context_embedded = layers.Dropout(1 - self.config.DROPOUT_KEEP_RATE)(context_embedded)
# Dense layer: (None,max_contents,3*embedding_size) -> (None,max_contents, code_vector_size)
context_after_dense = layers.TimeDistributed(
layers.Dense(self.config.CODE_VECTOR_SIZE, use_bias=False, activation='tanh')
)(context_embedded)
# attention layer: (None, max_contents,code_vector_size) -> (None,code_vector_size)
code_vectors, attention_weights = Attention_layer(name='attention')(
[context_after_dense, context_valid_mask]
)
"""
下面是用C2AE分类器进行分类的模型
"""
Fx = layers.Dense(
186 , use_bias=True,activation=None,name='Fx'
)(code_vectors)
Fx_relu = tf.tanh(Fx)
Fx_dropout = layers.Dropout(0.5)(Fx_relu)
targets_input = layers.Input((self.config.categories,), dtype=tf.float32)
targets_hidden = layers.Dense(
186, use_bias=True,activation=None,name='targets_hidden'
)(targets_input)
targets_hidden_relu = tf.tanh(targets_hidden)
targets_hidden_dropout = layers.Dropout(0.5)(targets_hidden_relu)
targets_output = layers.Dense(
186, use_bias=True,activation=None,name='targets_embedding'
)(targets_hidden_dropout)
targets_output_relu = tf.tanh(targets_output)
targets_output_dropout = layers.Dropout(0.5)(targets_output_relu)
targets_loss = layers.subtract([targets_output_dropout,Fx_dropout],name='targets_loss')
Fd1 = layers.Dense(
186, use_bias=True,activation=None,name='Fd1'
)(Fx_dropout)
Fd_relu1 = tf.tanh(Fd1)
Fd_dropout1 = layers.Dropout(0.5)(Fd_relu1)
Fd = layers.Dense(
186, use_bias=True, activation=None, name='Fd'
)(Fd_dropout1)
Fd_relu = tf.tanh(Fd)
Fd_dropout = layers.Dropout(0.5)(Fd_relu)
target_index_3 = layers.Dense(
self.config.categories, use_bias=True,activation='sigmoid',name='target_index_3'
)(Fd_dropout)
inputs = [path_source_token_input, path_input, path_target_token_input, context_valid_mask, targets_input]
self.keras_train_model = keras.Model(inputs = inputs, outputs = [target_index_3,targets_loss])
print("------------------create_keras_model Done.-------------------------")
@classmethod
def _create_optimizer(self):
return tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
def _comile_keras_model(self, optimizer=None):
if optimizer is None:
optimizer = self._create_optimizer()
################################评估指标##############################
def exactMatch(y_true, y_pred):
y_pred1 = y_pred
row_dif = K.cast(K.sum(K.round(K.clip(y_true * (1 - y_pred1) + (1-y_true) * y_pred1,0,1)), axis=1) > K.epsilon(),'float32')
dif = K.sum(K.round(row_dif))
row_equ = K.cast(K.abs(K.sum(K.round(K.clip(y_true * y_pred1 + (1-y_true) * (1 - y_pred1),0,1)), axis=1) - self.config.categories) < K.epsilon(),'float32')
equ = K.sum(K.round(row_equ))
return equ / (equ + dif + K.epsilon())
def micro_getPrecsion(y_true, y_pred):
TP = tf.reduce_sum(y_true * tf.round(y_pred))
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
precision = TP / (TP + FP)
return precision
def micro_getRecall(y_true, y_pred):
TP = tf.reduce_sum(y_true * tf.round(y_pred))
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
precision = TP / (TP + FP)
recall = TP / (TP + FN)
return recall
# F1-score评价指标
def micro_F1score(y_true, y_pred):
TP = tf.reduce_sum(y_true * tf.round(y_pred))
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F1score = 2 * precision * recall / (precision + recall)
return F1score
def macro_getPrecison(y_true, y_pred):
col_TP = K.sum(y_true * K.round(y_pred), axis=0)
col_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
col_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
col_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
precsion = K.mean(col_TP / (col_TP + col_FP + K.epsilon()))
return precsion
def macro_getRecall(y_true, y_pred):
# print(y_true)
row_TP = K.sum(y_true * K.round(y_pred), axis=0)
row_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
row_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
row_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
recall = K.mean(row_TP / (row_TP + row_FN + K.epsilon()))
return recall
def macro_getF1score(y_true, y_pred):
precision = macro_getPrecison(y_true, y_pred)
recall = macro_getRecall(y_true, y_pred)
F1score = 2 * precision * recall / (precision + recall)
return F1score
"""
C2AE损失函数:
custom_loss:
返回模型最后一层target_loss的平方和,这里y_true是随机设的
a_cross_loss:
返回输出的二分类交叉熵
"""
def custom_loss(y_true,y_pred):
return 1*tf.reduce_mean(tf.square(y_pred))
def a_cross_loss(y_true, y_pred):
cross_loss = tf.add(tf.log(1e-10 + y_pred) * y_true, tf.log(1e-10 + (1 - y_pred)) * (1 - y_true))
cross_entropy_label = -1 * tf.reduce_mean(tf.reduce_sum(cross_loss, 1))
return 0.1*cross_entropy_label
self.keras_train_model.compile(
loss = {'target_index_3':a_cross_loss,'targets_loss':custom_loss},
optimizer=optimizer,
metrics={'target_index_3':[exactMatch, micro_getPrecsion, micro_getRecall,micro_F1score,macro_getPrecison,macro_getRecall,macro_getF1score]}
)
if __name__ == "__main__":
config = Config(set_defaults=True, load_from_args=True, verify=False)
model = Code2VecModel(config)
model._create_keras_model()