Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 

250 рядки
9.8 KiB

import sys
sys.path.append('../')
from PIPE.config import Config
from PIPE.keras_model import Code2VecModel
# from data_process import Code2tags
import pandas as pd
import os
os_path = '/'.join(os.path.abspath(__file__).split('/')[:-1])
# print(os_path)
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard
class PIPEModel():
def __init__(self):
self.config = Config()
self.model = Code2VecModel(self.config)
self.modelSaved_path = os_path + '/training'
self.base_path = os_path + "/codeData/"
self.parsed_code_path=os_path + '/predict/parsedCode/'
self.predict_code_path = os_path + '/predict/predictCode/'
self.outData_path = self.base_path + 'outData/'
#################################模型训练#################################################################
def train(self, x_inputs,y_inputs):
self.model._create_keras_model()
self.model._comile_keras_model()
cur_model = self.model.keras_train_model
modelSaved_path = os.path.join(self.modelSaved_path, 'C2AE_model')
if not os.path.exists(modelSaved_path):
os.mkdir(modelSaved_path)
# keras.utils.plot_model(cur_model, os.path.join(modelSaved_path, 'model.png'), show_shapes=True)
checkpoint_path = os.path.join(modelSaved_path, "cp-{epoch:04d}.ckpt")
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, verbose=1,
save_weights_only=True,
period=self.config.SAVE_EVERY_EPOCHS)
cur_model.save_weights(checkpoint_path.format(epoch=0))
tensorboard = TensorBoard(log_dir=os.path.join(modelSaved_path, "logs"))
history = cur_model.fit(x_inputs, y_inputs,
batch_size=self.config.TRAIN_BATCH_SIZE,
epochs=self.config.NUM_TRAIN_EPOCHS, callbacks=[cp_callback, tensorboard],
validation_split=1/4)
print("History:")
print(history.history)
self.cur_model = cur_model
###########################################加载模型############################################
def load_savedModel(self):
self.model._create_keras_model()
self.model._comile_keras_model()
cur_model = self.model.keras_train_model
checkpoint_path = self.modelSaved_path + '/C2AE_model/cp-0015.ckpt'
cur_model.load_weights(checkpoint_path)
self.cur_model = cur_model
######################################预测#####################################################
def predict(self, inputs):
if self.cur_model == None:
raise Exception("model not loaded")
return self.cur_model.predict(inputs)
def evaluate(self, inputs, targets):
if self.cur_model == None:
raise Exception("model not loaded")
return self.cur_model.evaluate(inputs, targets)
def get_hashed_table(self):
node_types_dict = {}
paths_dict = {}
tokens_dict = {}
node_types_path = os.path.join(self.outData_path, "node_types.csv")
paths_path = os.path.join(self.outData_path, "paths.csv")
tokens_path = os.path.join(self.outData_path, "tokens.csv")
file1 = pd.read_csv(node_types_path)
def node_types2dict(row):
node_types_dict[row['node_type']] = row['id']
file1.apply(node_types2dict, axis=1)
file2 = pd.read_csv(paths_path)
def paths2dict(row):
paths_dict[row['path']] = row['id']
file2.apply(paths2dict, axis=1)
file3 = pd.read_csv(tokens_path)
def tokens2dict(row):
tokens_dict[row['token']] = row['id']
file3.apply(tokens2dict, axis=1)
self.node_types_dict = node_types_dict
self.paths_dict = paths_dict
self.tokens_dict = tokens_dict
##############################################对代码文本进行预测############################################
def predict_code(self, code=''):
self.get_hashed_table()
self.already_get_hashed_table = True
code_name = "main.c"
code_path = os.path.join(self.predict_code_path, code_name)
with open(code_path, 'w') as f:
f.write(code)
cur_order = r'java -jar ' + os_path + r'/cli.jar pathContexts --lang c --project ' + self.predict_code_path + ' --output ' + self.parsed_code_path + \
r' --maxH 8 --maxW 2 --maxContexts ' + str(self.config.MAX_CONTEXTS) + ' --maxTokens ' + str(
self.config.MAX_TOKEN_VOCAB_SIZE) + \
' --maxPaths ' + str(self.config.MAX_PATH_VOCAB_SIZE)
ret = os.system(cur_order)
assert ret == 0
node_types_path = os.path.join(self.parsed_code_path, "node_types.csv")
paths_path = os.path.join(self.parsed_code_path, "paths.csv")
tokens_path = os.path.join(self.parsed_code_path, "tokens.csv")
code_path = os.path.join(self.parsed_code_path, "path_contexts_0.csv")
file1 = pd.read_csv(node_types_path)
file2 = pd.read_csv(paths_path)
file3 = pd.read_csv(tokens_path)
temp_node_types_dict = {}
temp_paths_dict = {}
temp_tokens_dict = {}
def node_types2dict(row):
temp_node_types_dict[row['id']] = row['node_type']
file1.apply(node_types2dict, axis=1)
def paths2dict(row):
temp_paths_dict[row['id']] = row['path']
file2.apply(paths2dict, axis=1)
def tokens2dict(row):
temp_tokens_dict[row['id']] = row['token']
file3.apply(tokens2dict, axis=1)
source_input = []
path_input = []
target_input = []
context_valid_input = []
file4 = open(code_path, "r")
code = file4.readline().split(' ')[1:]
cnt = 0
for path in code:
temp_source, temp_path, temp_target = map(int, path.split(','))
try:
real_source = self.tokens_dict[temp_tokens_dict[temp_source]]
real_target = self.tokens_dict[temp_tokens_dict[temp_target]]
real_path = self.paths_dict[' '.join(list(map(lambda x: str(self.node_types_dict[x]),
list(map(lambda x: temp_node_types_dict[int(x)],
temp_paths_dict[temp_path].split(' '))))))]
context_valid_input.append(1)
except:
# if the path cannot map into the vocabulary due to dataset reason or unknown cli.jar technical stargety
# I currently choose to set value to zero to delete the path
real_path = 0
context_valid_input.append(0)
real_source = 0
real_target = 0
source_input.append(real_source)
path_input.append(real_path)
target_input.append(real_target)
cnt = cnt + 1
file4.close()
source_input = [np.pad(source_input, (0, self.config.MAX_CONTEXTS - cnt), 'constant', constant_values=(0, 0))]
path_input = [np.pad(path_input, (0, self.config.MAX_CONTEXTS - cnt), 'constant', constant_values=(0, 0))]
target_input = [np.pad(target_input, (0, self.config.MAX_CONTEXTS - cnt), 'constant', constant_values=(0, 0))]
context_valid_input = [
np.pad(context_valid_input, (0, self.config.MAX_CONTEXTS - cnt), 'constant', constant_values=(0, 0))]
lab = [np.array([0] * 10)]
inputs = (source_input, path_input, target_input, context_valid_input, lab)
index2str = ['输入变量错误', '无输出', '输出格式错误', '初始化错误','数据类型错误',
'数据精度错误', '循环错误',"分支错误","逻辑错误","运算符错误"]
result = self.predict(inputs)
results = result[0][0].tolist()
final=""
dict = {}
for i in range(self.config.categories):
dict[index2str[i]]=results[i]
#降序输出结果
sorted_prob=sorted(dict.items(),key=lambda kv:(kv[1], kv[0]),reverse=True)
for x in sorted_prob:
final=final+str(x[0])+':'+str("%.2f%%" % (x[1] * 100))+'\n'
# print(final)
return final
# if __name__ == '__main__':
######################训练模型#####################################################
# data=Code2tags()
# train_inputs=np.load(data.sourceData_path+"train_inputs.npy",allow_pickle=True).tolist()
# test_inputs=np.load(data.sourceData_path+"test_inputs.npy",allow_pickle=True).tolist()
#
# x_train=train_inputs[:-2]
# y_train=train_inputs[-2:]
#
# model=PIPEModel()
# model.train(x_train,y_train)
# # modelSaved_path = os.path.join(model.modelSaved_path,"C2AE_model")
# # model.load_savedModel(os.path.join(modelSaved_path,"cp-0080.ckpt"))
# answer = model.evaluate(test_inputs[:-2],test_inputs[-2:])
# print(answer)
#######################对代码进行预测############################################
# model=PIPEModel()
# modelSaved_path = os.path.join(model.modelSaved_path,"C2AE_model")
# model.load_savedModel(os.path.join(modelSaved_path,"cp-0015.ckpt"))
#############################传入code文本########################################
# code_text=open("./predict/predictCode/code.txt","r").read()
# code_text = ''
# include <stdio.h>
#
# int
# main()
# {
# return 0;
# }
# answer = model.predict_code(code=code_text)
# answer = model.predict_code()
# print(answer)