#!/usr/bin/python import numpy as np from math import sqrt from pyspark.sql.functions import rand from pyspark.sql import SparkSession, Row from pyspark.ml.feature import VectorAssembler, StringIndexer from pyspark.mllib.evaluation import MulticlassMetrics import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from bigdl.util.common import * from bigdl.optim.optimizer import * from bigdl.nn.criterion import * from bigdl.nn.layer import * from sklearn.metrics import roc_curve from ML_utils import * hdfspath = "hdfs://10.0.3.13:9000" logfile = "log.txt" datapath = "%s/DataMC"%hdfspath num_executors = 3 num_cores = 15 # This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers. num_workers = num_executors * num_cores NUMPARTCHG = 10 NUMPARTNTR = 15 PADVAL = 0 jetlabel = ["bb", "cc", "qq"] nb_classes = len(jetlabel) # define the features for each categories vtxname = ['fdrMin', 'ptSvrJet', 'nTrk', 'nTrkJet', 'drSvrJet', 'absQSum', 'm', 'mCor', \ 'fdChi2', 'ipChi2Sum', 'pass', 'tau', 'z', 'pt'] glbname = ['PX', 'PY', 'PZ', 'PE', 'M'] chgname = ['IP', 'Q', 'E', 'pT', 'pX', 'pY', 'pZ', 'ID', \ 'Eta', 'Phi', 'Chi2', 'QoverP', 'NNe', 'NNk', 'NNpi', 'NNmu', \ 'trackX', 'trackY', 'trackZ', 'trackVX', 'trackVY', 'trackVZ'] ntrname = ['E', 'pT', 'pX', 'pY', 'pZ', 'ID', 'Eta', 'Phi', \ 'CaloNeutralEcal', 'CaloNeutralHcal2Ecal', 'CaloNeutralE49', 'CaloNeutralPrs'] nfc = len(chgname) nfn = len(ntrname) nfv = len(vtxname) nfg = len(glbname) chgcol = ['chg_%02d_%s'%(p,n) for p in range(NUMPARTCHG) for n in chgname] ntrcol = ['ntr_%02d_%s'%(p,n) for p in range(NUMPARTNTR) for n in ntrname] vtxcol = ['vtx_%s'%n for n in vtxname] glbcol = ['glb_%s'%n for n in glbname] def process(inRow): rargs = dict() dau_Qs = np.array(inRow['Daughters_Q']) max_daus = inRow['nDaughters'] # charged tmpi, = np.where(dau_Qs[:max_daus] != 0) dau_IPs = np.array(inRow['Daughters_IP']) chgidx = tmpi[np.argsort(-dau_IPs[tmpi])] padlen = NUMPARTCHG - len( chgidx[:NUMPARTCHG] ) for item in chgname: chgfeatures = np.pad([inRow["Daughters_%s"%item][x] for x in chgidx[:NUMPARTCHG]], (0, padlen), 'constant', constant_values = PADVAL).tolist() for idx in range(NUMPARTCHG): rargs["chg_%02d_%s"%(idx, item)] = chgfeatures[idx] # neutral tmpi, = np.where(dau_Qs[:max_daus] == 0) dau_Es = np.array(inRow['Daughters_E']) dau_pXs = np.array(inRow['Daughters_pX']) dau_pYs = np.array(inRow['Daughters_pY']) dau_pZs = np.array(inRow['Daughters_pZ']) ntridx = tmpi[np.argsort([-dau_Es[x] * dau_pZs[x] / sqrt( dau_pXs[x]**2 + dau_pYs[x]**2 + dau_pZs[x]**2 ) for x in tmpi])] padlen = NUMPARTNTR - len( ntridx[:NUMPARTNTR] ) for item in ntrname: ntrfeatures = np.pad([inRow["Daughters_%s"%item][x] for x in ntridx[:NUMPARTNTR]], (0, padlen), 'constant', constant_values = PADVAL).tolist() for idx in range(NUMPARTNTR): rargs["ntr_%02d_%s"%(idx, item)] = ntrfeatures[idx] # vertex for item in vtxname: rargs["vtx_%s"%item] = inRow["BDTTag_%s"%item][0] # global for item in glbname: rargs["glb_%s"%item] = inRow[item] rargs["glb_NPartChg"] = len(chgidx) rargs["glb_NPartNtr"] = len(ntridx) rargs["evtLabel"] = inRow['label'] return Row(**rargs) spark = SparkSession.builder.appName("Prepare Root File").getOrCreate() spark.sparkContext.setLogLevel("WARN") init_engine() inFile = ["Dijet_bb_pt10_15_dw", "Dijet_bb_pt15_20_dw", "Dijet_bb_pt20_50_dw", "Dijet_bb_pt50_dw",\ "Dijet_cc_pt10_15_up", "Dijet_cc_pt15_20_dw", "Dijet_cc_pt20_50_dw", "Dijet_cc_pt50_dw",\ "Dijet_qq_pt10_15_up", "Dijet_qq_pt15_20_dw", "Dijet_qq_pt20_50_dw", "Dijet_qq_pt50_dw"] inData = [] for fname in inFile: print("Read input file: %s..."%fname) inData.append(spark.read.format("com.databricks.spark.avro").load("%s/%s.avro"%(datapath, fname)).rdd.map(process).toDF()) columns = inData[0].columns raw_dataset = inData[0].select(columns) for i in range(len(inFile))[1:]: raw_dataset = raw_dataset.union(inData[i].select(columns)) vector_assembler = VectorAssembler(inputCols=chgcol, outputCol="chgfeatures") raw_dataset = vector_assembler.transform(raw_dataset) #vector_assembler = VectorAssembler(inputCols=ntrcol, outputCol="ntrfeatures") #raw_dataset = vector_assembler.transform(raw_dataset) vector_assembler = VectorAssembler(inputCols=vtxcol, outputCol="vtxfeatures") raw_dataset = vector_assembler.transform(raw_dataset) vector_assembler = VectorAssembler(inputCols=glbcol, outputCol="glbfeatures") raw_dataset = vector_assembler.transform(raw_dataset) label_indexer = StringIndexer(inputCol="evtLabel", outputCol="label_index").fit(raw_dataset) raw_dataset = label_indexer.transform(raw_dataset) # shuffle data raw_dataset = raw_dataset.orderBy(rand()) #featureCol=["chgfeatures", "ntrfeatures", "vtxfeatures", "glbfeatures"] featureCol=["chgfeatures", "vtxfeatures", "glbfeatures"] ## It seems that bigdl wants labels > 0 sample = raw_dataset.rdd.map(lambda row: Sample.from_ndarray([np.asarray(row.__getitem__(feature)) for feature in featureCol], np.asarray(row.__getitem__("label_index")) +1 ) ) # Finally, we create a trainingset and a testset. print("Create training and set data and cached them ...") (training_set, test_set, not_used) = sample.randomSplit([0.5, 0.2, 0.3]) training_set.cache() test_set.cache() print("Training set: %d"%training_set.count()) #### Define model chgHiddenLayer = 150 ntrHiddenLayer = 150 #DNNmodel = Sequential()\ # .add(BatchNormalization(chgHiddenLayer+ntrHiddenLayer+nfv+nfg)) \ # .add(Linear(chgHiddenLayer+ntrHiddenLayer+nfv+nfg, 350)) \ DNNmodel = Sequential()\ .add(BatchNormalization(chgHiddenLayer+nfv+nfg)) \ .add(Linear(chgHiddenLayer+nfv+nfg, 350)) \ .add(Dropout(init_p=0.1)) \ .add(Linear(350, 224)) \ .add(Dropout(init_p=0.1)) \ .add(Linear(224, 128)) \ .add(Dropout(init_p=0.1)) \ .add(Linear(128, 96)) \ .add(Dropout(init_p=0.1)) \ .add(Linear(96, 96)) \ .add(Dropout(init_p=0.1)) \ .add(Linear(96, 3)) \ .add(SoftMax()) chgmodel = Sequential().add(Reshape([NUMPARTCHG,nfc])).add(Reverse(2)).add(Recurrent().add(LSTM(nfc, chgHiddenLayer, p=0.1))).add(Select(2,-1)) #ntrmodel = Sequential().add(Reshape([NUMPARTNTR,nfn])).add(Reverse(2)).add(Recurrent().add(LSTM(nfn, ntrHiddenLayer, p=0.1))).add(Select(2,-1)) vtxmodel = Sequential().add(BatchNormalization(nfv)) glbmodel = Sequential().add(BatchNormalization(nfg)) #branches = ParallelTable().add(chgmodel).add(ntrmodel).add(vtxmodel).add(glbmodel) branches = ParallelTable().add(chgmodel).add(vtxmodel).add(glbmodel) #bigdl_model = Sequential().add(branches).add(JoinTable(2,4)).add(DNNmodel) bigdl_model = Sequential().add(branches).add(JoinTable(2,3)).add(DNNmodel) logDir = "BigDlLog" appName = "Adam5" batchsize = 32 * num_cores * num_executors epoch = 20 optim = Adam(learningrate=0.0002) #optim = Adagrad(learningrate=0.0002) #optim = SGD(learningrate=0.002) #criterion = MeanSquaredLogarithmicCriterion() criterion = ClassNLLCriterion(logProbAsInput=False) trainer = Optimizer(model=bigdl_model, training_rdd=training_set, criterion=criterion, optim_method=optim, end_trigger=MaxEpoch(epoch), batch_size=batchsize) #valtrigger = EveryEpoch() valtrigger = SeveralIteration(1000) trainer.set_validation(batch_size=batchsize, val_rdd=test_set, trigger=valtrigger, val_method=[Loss(criterion), Top1Accuracy()]) trainSummary = TrainSummary(log_dir=logDir,app_name=appName) valSummary = ValidationSummary(log_dir=logDir,app_name=appName) trainer.set_train_summary(trainSummary) trainer.set_val_summary(valSummary) trainer.set_checkpoint(MaxEpoch(epoch), "BigDlModels") print("Start optimization...") print("Appname: %s epochs: %d batch: %d"%(appName, epoch, batchsize)) trainedModel = trainer.optimize() print("Evaluate model..") modEvalLoss = trainedModel.evaluate(test_set, 200, [Loss(criterion)]) print(modEvalLoss[0]) modEvalAcc1 = trainedModel.evaluate(test_set, 200, [Top1Accuracy()]) print(modEvalAcc1[0]) #predictResult = trainedModel.predict(test_set) #predictResultClass = trainedModel.predict_class(test_set) #y_pred = predictResult.map(lambda x: np.array(x).argmax()).collect() #y_true = test_set.map(lambda x: (int(x.labels[0].storage - 1))).collect() pred = test_set.zip( trainedModel.predict(test_set) ) predictionAndLabels = pred.map(lambda lp: ( float(np.array(lp[1]).argmax()) , float(lp[0].labels[0].storage - 1) )) from pyspark.mllib.evaluation import MulticlassMetrics metrics = MulticlassMetrics(predictionAndLabels) precision = metrics.precision() recall = metrics.recall() f1Score = metrics.fMeasure() print("Summary Stats") print("Precision = %s" % precision) print("Recall = %s" % recall) print("F1 Score = %s" % f1Score) for label in [0.0, 1.0, 2.0]: print("Class %s precision = %s" % (label, metrics.precision(label))) print("Class %s recall = %s" % (label, metrics.recall(label))) print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0))) # Weighted stats print("Weighted recall = %s" % metrics.weightedRecall) print("Weighted precision = %s" % metrics.weightedPrecision) print("Weighted F(1) Score = %s" % metrics.weightedFMeasure()) print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5)) print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate) #cm = confusion_matrix(y_true, y_pred) #confmatrix(cm, jetlabel, "confMatrix.png") cm = metrics.confusionMatrix() confmatrix(cm.toArray(), jetlabel, "confMatrix.png") probAndLabels = pred.map(lambda lp: ( lp[1], float(lp[0].labels[0].storage - 1) )) pl = np.array(probAndLabels.collect()) # Compute ROC curve and ROC area for each class fpr = dict() tpr = dict() roc_auc = dict() for i in range(nb_classes): fpr[i], tpr[i], _ = roc_curve(pl[:,1].astype(int), np.array(pl[:,0].tolist())[:,i], pos_label=i) roc_auc[i] = auc(fpr[i], tpr[i]) # Compute macro-average ROC curve and ROC area plotclassroc(fpr, tpr, roc_auc, jetlabel, "roc.png") spark.sparkContext.stop()