|
import sys
|
|
sys.path.append('../')
|
|
import tensorflow as tf
|
|
try:
|
|
import tensorflow.python.keras as keras
|
|
from tensorflow.python.keras import layers
|
|
import tensorflow.python.keras.backend as K
|
|
except:
|
|
import tensorflow.keras as keras
|
|
from tensorflow.keras import layers
|
|
import tensorflow.keras.backend as K
|
|
from typing import Optional
|
|
from PIPE.config import Config
|
|
from PIPE.keras_attention_layer import Attention_layer
|
|
|
|
|
|
class Code2VecModel():
|
|
def __init__(self,config: Config):
|
|
self.keras_train_model: Optional[keras.Model] = None
|
|
self.config = config
|
|
|
|
##################################################搭建模型结构函数########################################################
|
|
def _create_keras_model(self):
|
|
path_source_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
|
|
path_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
|
|
path_target_token_input = layers.Input((self.config.MAX_CONTEXTS,), dtype=tf.int32)
|
|
context_valid_mask = layers.Input((self.config.MAX_CONTEXTS,))
|
|
|
|
# path embedding layer
|
|
# (None, max_contents) -> (None,max_contents,path_embedding_size)
|
|
paths_embedded = layers.Embedding(
|
|
self.config.path_vocab_size, self.config.PATH_EMBEDDINGS_SIZE, name = 'path_embedding'
|
|
)(path_input)
|
|
|
|
# terminal embedding layer
|
|
# (None, max_contents) -> (None,max_contents,token_embedding_size)
|
|
token_embedding_shared_layer = layers.Embedding(
|
|
self.config.token_vocab_size, self.config.TOKEN_EMBEDDINGS_SIZE, name = 'token_embedding'
|
|
)
|
|
|
|
path_source_token_embedded = token_embedding_shared_layer(path_source_token_input)
|
|
path_target_token_embedded = token_embedding_shared_layer(path_target_token_input)
|
|
|
|
# concatenate layer: paths -> [source, path, target]
|
|
# [3 * (None,max_contents, token_embedding_size)] -> (None, max_contents,3*embedding_size)
|
|
context_embedded = layers.Concatenate()([path_source_token_embedded, paths_embedded, path_target_token_embedded])
|
|
context_embedded = layers.Dropout(1 - self.config.DROPOUT_KEEP_RATE)(context_embedded)
|
|
|
|
# Dense layer: (None,max_contents,3*embedding_size) -> (None,max_contents, code_vector_size)
|
|
context_after_dense = layers.TimeDistributed(
|
|
layers.Dense(self.config.CODE_VECTOR_SIZE, use_bias=False, activation='tanh')
|
|
)(context_embedded)
|
|
|
|
# attention layer: (None, max_contents,code_vector_size) -> (None,code_vector_size)
|
|
code_vectors, attention_weights = Attention_layer(name='attention')(
|
|
[context_after_dense, context_valid_mask]
|
|
)
|
|
|
|
"""
|
|
下面是用C2AE分类器进行分类的模型
|
|
"""
|
|
Fx = layers.Dense(
|
|
186 , use_bias=True,activation=None,name='Fx'
|
|
)(code_vectors)
|
|
|
|
Fx_relu = tf.tanh(Fx)
|
|
Fx_dropout = layers.Dropout(0.5)(Fx_relu)
|
|
|
|
targets_input = layers.Input((self.config.categories,), dtype=tf.float32)
|
|
targets_hidden = layers.Dense(
|
|
186, use_bias=True,activation=None,name='targets_hidden'
|
|
)(targets_input)
|
|
targets_hidden_relu = tf.tanh(targets_hidden)
|
|
|
|
targets_hidden_dropout = layers.Dropout(0.5)(targets_hidden_relu)
|
|
targets_output = layers.Dense(
|
|
186, use_bias=True,activation=None,name='targets_embedding'
|
|
)(targets_hidden_dropout)
|
|
targets_output_relu = tf.tanh(targets_output)
|
|
|
|
targets_output_dropout = layers.Dropout(0.5)(targets_output_relu)
|
|
|
|
targets_loss = layers.subtract([targets_output_dropout,Fx_dropout],name='targets_loss')
|
|
Fd1 = layers.Dense(
|
|
186, use_bias=True,activation=None,name='Fd1'
|
|
)(Fx_dropout)
|
|
|
|
Fd_relu1 = tf.tanh(Fd1)
|
|
|
|
Fd_dropout1 = layers.Dropout(0.5)(Fd_relu1)
|
|
|
|
Fd = layers.Dense(
|
|
186, use_bias=True, activation=None, name='Fd'
|
|
)(Fd_dropout1)
|
|
|
|
Fd_relu = tf.tanh(Fd)
|
|
|
|
Fd_dropout = layers.Dropout(0.5)(Fd_relu)
|
|
|
|
target_index_3 = layers.Dense(
|
|
self.config.categories, use_bias=True,activation='sigmoid',name='target_index_3'
|
|
)(Fd_dropout)
|
|
|
|
inputs = [path_source_token_input, path_input, path_target_token_input, context_valid_mask, targets_input]
|
|
self.keras_train_model = keras.Model(inputs = inputs, outputs = [target_index_3,targets_loss])
|
|
|
|
print("------------------create_keras_model Done.-------------------------")
|
|
|
|
@classmethod
|
|
|
|
def _create_optimizer(self):
|
|
return tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
|
|
|
|
def _comile_keras_model(self, optimizer=None):
|
|
if optimizer is None:
|
|
optimizer = self._create_optimizer()
|
|
|
|
################################评估指标##############################
|
|
def exactMatch(y_true, y_pred):
|
|
y_pred1 = y_pred
|
|
row_dif = K.cast(K.sum(K.round(K.clip(y_true * (1 - y_pred1) + (1-y_true) * y_pred1,0,1)), axis=1) > K.epsilon(),'float32')
|
|
dif = K.sum(K.round(row_dif))
|
|
row_equ = K.cast(K.abs(K.sum(K.round(K.clip(y_true * y_pred1 + (1-y_true) * (1 - y_pred1),0,1)), axis=1) - self.config.categories) < K.epsilon(),'float32')
|
|
equ = K.sum(K.round(row_equ))
|
|
return equ / (equ + dif + K.epsilon())
|
|
|
|
def micro_getPrecsion(y_true, y_pred):
|
|
TP = tf.reduce_sum(y_true * tf.round(y_pred))
|
|
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
|
|
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
|
|
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
|
|
precision = TP / (TP + FP)
|
|
return precision
|
|
|
|
def micro_getRecall(y_true, y_pred):
|
|
TP = tf.reduce_sum(y_true * tf.round(y_pred))
|
|
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
|
|
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
|
|
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
|
|
precision = TP / (TP + FP)
|
|
recall = TP / (TP + FN)
|
|
return recall
|
|
|
|
# F1-score评价指标
|
|
def micro_F1score(y_true, y_pred):
|
|
TP = tf.reduce_sum(y_true * tf.round(y_pred))
|
|
TN = tf.reduce_sum((1 - y_true) * (1 - tf.round(y_pred)))
|
|
FP = tf.reduce_sum((1 - y_true) * tf.round(y_pred))
|
|
FN = tf.reduce_sum(y_true * (1 - tf.round(y_pred)))
|
|
precision = TP / (TP + FP)
|
|
recall = TP / (TP + FN)
|
|
F1score = 2 * precision * recall / (precision + recall)
|
|
return F1score
|
|
|
|
def macro_getPrecison(y_true, y_pred):
|
|
col_TP = K.sum(y_true * K.round(y_pred), axis=0)
|
|
col_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
|
|
col_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
|
|
col_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
|
|
precsion = K.mean(col_TP / (col_TP + col_FP + K.epsilon()))
|
|
return precsion
|
|
|
|
def macro_getRecall(y_true, y_pred):
|
|
# print(y_true)
|
|
row_TP = K.sum(y_true * K.round(y_pred), axis=0)
|
|
row_TN = K.sum((1 - y_true) * (1 - K.round(y_pred)), axis=0)
|
|
row_FP = K.sum((1 - y_true) * K.round(y_pred), axis=0)
|
|
row_FN = K.sum(y_true * (1 - K.round(y_pred)), axis=0)
|
|
recall = K.mean(row_TP / (row_TP + row_FN + K.epsilon()))
|
|
return recall
|
|
|
|
def macro_getF1score(y_true, y_pred):
|
|
precision = macro_getPrecison(y_true, y_pred)
|
|
recall = macro_getRecall(y_true, y_pred)
|
|
F1score = 2 * precision * recall / (precision + recall)
|
|
return F1score
|
|
|
|
"""
|
|
C2AE损失函数:
|
|
custom_loss:
|
|
返回模型最后一层target_loss的平方和,这里y_true是随机设的
|
|
a_cross_loss:
|
|
返回输出的二分类交叉熵
|
|
"""
|
|
def custom_loss(y_true,y_pred):
|
|
return 1*tf.reduce_mean(tf.square(y_pred))
|
|
|
|
def a_cross_loss(y_true, y_pred):
|
|
cross_loss = tf.add(tf.log(1e-10 + y_pred) * y_true, tf.log(1e-10 + (1 - y_pred)) * (1 - y_true))
|
|
cross_entropy_label = -1 * tf.reduce_mean(tf.reduce_sum(cross_loss, 1))
|
|
return 0.1*cross_entropy_label
|
|
|
|
self.keras_train_model.compile(
|
|
loss = {'target_index_3':a_cross_loss,'targets_loss':custom_loss},
|
|
optimizer=optimizer,
|
|
metrics={'target_index_3':[exactMatch, micro_getPrecsion, micro_getRecall,micro_F1score,macro_getPrecison,macro_getRecall,macro_getF1score]}
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
config = Config(set_defaults=True, load_from_args=True, verify=False)
|
|
model = Code2VecModel(config)
|
|
model._create_keras_model()
|
|
|