Fix the memory overflow caused by modifying the filter parameter.

This commit is contained in:
sunwen
2023-07-03 17:12:53 +08:00
parent 70b6cbb526
commit 6c7f967160

View File

@@ -19,10 +19,13 @@
#include <cstddef>
#include <mkl_cblas.h>
#include "MatlabReader.h"
#include <iostream>
#include <mkl_vml_functions.h>
#include <vector>
#include <map>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace Recon;
using namespace Aurora;
@@ -45,6 +48,15 @@ namespace
Matrix waterTempRefBlock;
};
std::map<std::string, BlockOfTransmissionData> BLOCK_OF_TRANSIMISSIONDARA_BUFFER;
std::mutex CREATE_BUFFER_MUTEX;
std::condition_variable CREATE_BUFFER_CONDITION;
std::mutex PROCESS_BUFFER_MUTEX;
std::condition_variable PROCESS_BUFFER_CONDITION;
int BUFFER_COUNT = 0;
int BUFFER_SIZE = 3;
Matrix prepareAScansForTransmissionDetection(const Matrix& aAscanBlock, const Matrix& aGainBlock)
{
Matrix result = aAscanBlock / repmat(aGainBlock, aAscanBlock.getDimSize(0), 1);
@@ -192,6 +204,58 @@ namespace
}
void getBlockOfTransmissionDataInThread(size_t aIndex, const Matrix& aMp, const Matrix& aMpRef, const Matrix& aSl, const Matrix& aSn, const Matrix& aRlList, const Matrix& aRnList,
const TasTemps& aTasTemps, const Matrix& aExpectedSOSWater, GeometryInfo aGeom, GeometryInfo aGeomRef,
const Matrix& aSnrRmsNoise, const Matrix& aSnrRmsNoiseRef, const MeasurementInfo& aExpInfo, const MeasurementInfo& aExpInfoRef,
const PreComputes& aPreComputes, Parser* aParser, Parser* aParserRef)
{
auto buffer = getBlockOfTransmissionData(aMp, aMpRef, aSl, aSn, aRlList, aRnList, aTasTemps,
aExpectedSOSWater, aGeom, aGeomRef, aSnrRmsNoise, aSnrRmsNoiseRef,
aExpInfo, aExpInfoRef, aPreComputes, aParser, aParserRef);
std::unique_lock<std::mutex> lock(CREATE_BUFFER_MUTEX);
BLOCK_OF_TRANSIMISSIONDARA_BUFFER[std::to_string(aIndex)] = buffer;
std::cout<<"Add: "<<aIndex<<std::endl;
lock.unlock();
PROCESS_BUFFER_CONDITION.notify_one();
}
void createThreadForGetBlockOfTransmissionData(const Matrix& aMotorPos, const Matrix& aMotoPosRef, const Matrix& aSlList, const Matrix& aSnList, const Matrix& aRlList, const Matrix& aRnList,
const TasTemps& aTasTemps, const Matrix& aExpectedSOSWater, GeometryInfo aGeom, GeometryInfo aGeomRef,
const Matrix& aSnrRmsNoise, const Matrix& aSnrRmsNoiseRef, const MeasurementInfo& aExpInfo, const MeasurementInfo& aExpInfoRef,
const PreComputes& aPreComputes, Parser* aParser, Parser* aParserRef)
{
size_t vectorSize = aMotorPos.getDataSize() * (aSlList.getDataSize() / transParams::senderTASSize) * (aSnList.getDataSize() / transParams::senderElementSize);
std::thread speedUpThread[vectorSize];
for(int i=0; i<aMotorPos.getDataSize(); ++i)
{
for(int j=0; j<aSlList.getDataSize() / transParams::senderTASSize; ++j)
{
for(int k=0; k<aSnList.getDataSize() / transParams::senderElementSize; ++k)
{
size_t index = i * (aSlList.getDataSize() / transParams::senderTASSize) * (aSnList.getDataSize() / transParams::senderElementSize) +
j * (aSnList.getDataSize() / transParams::senderElementSize) + k;
Matrix mp = aMotorPos(i).toMatrix();
Matrix mpRef = aMotoPosRef(i).toMatrix();
Matrix sl = aSlList.block(0, transParams::senderTASSize*j, transParams::senderTASSize*j+transParams::senderTASSize - 1);
Matrix sn = aSnList.block(0, transParams::senderElementSize*k, transParams::senderElementSize*k+transParams::senderElementSize - 1);
std::unique_lock<std::mutex> lock(CREATE_BUFFER_MUTEX);
CREATE_BUFFER_CONDITION.wait(lock, []{return BUFFER_COUNT<BUFFER_SIZE;});
++BUFFER_COUNT;
lock.unlock();
speedUpThread[index] = std::thread(getBlockOfTransmissionDataInThread,index,mp,mpRef,sl,sn,aRlList,aRnList,aTasTemps,aExpectedSOSWater,aGeom,aGeomRef,aSnrRmsNoise,aSnrRmsNoiseRef,aExpInfo,aExpInfoRef,aPreComputes,aParser, aParserRef);
}
}
}
for(auto& t:speedUpThread)
{
t.join();
}
}
TransmissionData Recon::getTransmissionData(const Aurora::Matrix& aMotorPos, const Aurora::Matrix& aMotoPosRef, const Aurora::Matrix& aSlList,
const Aurora::Matrix& aSnList, const Aurora::Matrix& aRlList, const Aurora::Matrix& aRnList,
const TempInfo& aTemp, const TempInfo& aTempRef, GeometryInfo& aGeom,
@@ -243,30 +307,16 @@ TransmissionData Recon::getTransmissionData(const Aurora::Matrix& aMotorPos, con
rnBlockTotal = zeros(1,numScans,1);
}
size_t vectorSize = aMotorPos.getDataSize() * (aSlList.getDataSize() / transParams::senderTASSize) * (aSnList.getDataSize() / transParams::senderElementSize);
std::vector<BlockOfTransmissionData> blockOfTransmissionDatas(vectorSize);
std::thread speedUpThread = std::thread(createThreadForGetBlockOfTransmissionData,aMotorPos,aMotoPosRef,aSlList,aSnList,aRlList,aRnList,tasTemps,aTemp.expectedSOSWater,aGeom,aGeomRef,rmsNoise,rmsNoiseRef,aExpInfo,aExpInfoRef,aPreComputes,aParser, aParserRef);
int numData = 0;
int numPossibleScans = 0;
for(int i=0; i<aMotorPos.getDataSize(); ++i)
sched_param sch;
int policy;
pthread_getschedparam(pthread_self(), &policy, &sch);
sch.sched_priority = 50;
if (pthread_setschedparam(pthread_self(), SCHED_FIFO, &sch))
{
#pragma omp parallel for num_threads(24)
for(int j=0; j<aSlList.getDataSize() / transParams::senderTASSize; ++j)
{
for(int k=0; k<aSnList.getDataSize() / transParams::senderElementSize; ++k)
{
size_t index = i * (aSlList.getDataSize() / transParams::senderTASSize) * (aSnList.getDataSize() / transParams::senderElementSize) +
j * (aSnList.getDataSize() / transParams::senderElementSize) + k;
Matrix mp = aMotorPos(i).toMatrix();
Matrix mpRef = aMotoPosRef(i).toMatrix();
Matrix sl = aSlList.block(0, transParams::senderTASSize*j, transParams::senderTASSize*j+transParams::senderTASSize - 1);
Matrix sn = aSnList.block(0, transParams::senderElementSize*k, transParams::senderElementSize*k+transParams::senderElementSize - 1);
blockOfTransmissionDatas[index] = getBlockOfTransmissionData(mp,mpRef,sl,sn,aRlList,aRnList,tasTemps,aTemp.expectedSOSWater,aGeom,aGeomRef,rmsNoise,rmsNoiseRef,aExpInfo,aExpInfoRef,aPreComputes,aParser, aParserRef);
std::cout<<numPossibleScans<<std::endl;
numPossibleScans++;
}
}
std::cerr << "Failed to set thread priority" << std::endl;
}
for(int i=0; i<aMotorPos.getDataSize(); ++i)
@@ -275,22 +325,21 @@ TransmissionData Recon::getTransmissionData(const Aurora::Matrix& aMotorPos, con
{
for(int k=0; k<aSnList.getDataSize() / transParams::senderElementSize; ++k)
{
// Matrix mp = aMotorPos(i).toMatrix();
// Matrix mpRef = aMotoPosRef(i).toMatrix();
// Matrix sl = aSlList.block(0, transParams::senderTASSize*j, transParams::senderTASSize*j+transParams::senderTASSize - 1);
// Matrix sn = aSnList.block(0, transParams::senderElementSize*k, transParams::senderElementSize*k+transParams::senderElementSize - 1);
// auto transmissionBlock = getBlockOfTransmissionData(mp,mpRef,sl,sn,aRlList,aRnList,tasTemps,aTemp.expectedSOSWater,aGeom,aGeomRef,rmsNoise,rmsNoiseRef,aExpInfo,aExpInfoRef,aPreComputes,aParser, aParserRef);
size_t index = i * (aSlList.getDataSize() / transParams::senderTASSize) * (aSnList.getDataSize() / transParams::senderElementSize) +
j * (aSnList.getDataSize() / transParams::senderElementSize) + k;
j * (aSnList.getDataSize() / transParams::senderElementSize) + k;
std::unique_lock<std::mutex> lock(PROCESS_BUFFER_MUTEX);
DetectResult detect = transmissionDetection( blockOfTransmissionDatas[index].ascanBlock, blockOfTransmissionDatas[index].ascanBlockRef,
blockOfTransmissionDatas[index].dists, blockOfTransmissionDatas[index].distRefBlock,
blockOfTransmissionDatas[index].waterTempBlock, blockOfTransmissionDatas[index].waterTempRefBlock,
PROCESS_BUFFER_CONDITION.wait(lock, [index]{return BLOCK_OF_TRANSIMISSIONDARA_BUFFER.find(std::to_string(index)) != BLOCK_OF_TRANSIMISSIONDARA_BUFFER.end();});
lock.unlock();
auto blockData = BLOCK_OF_TRANSIMISSIONDARA_BUFFER[std::to_string(index)];
DetectResult detect = transmissionDetection( blockData.ascanBlock, blockData.ascanBlockRef,
blockData.dists, blockData.distRefBlock,
blockData.waterTempBlock, blockData.waterTempRefBlock,
aTemp.expectedSOSWater[0]);
blockOfTransmissionDatas[index].attData = detect.att;
blockOfTransmissionDatas[index].tofData = detect.tof;
BlockOfTransmissionData transmissionBlock=blockOfTransmissionDatas[index];
blockData.attData = detect.att;
blockData.tofData = detect.tof;
BlockOfTransmissionData transmissionBlock=blockData;
size_t numUsedData = transmissionBlock.senderBlock.getDimSize(1);
if(transParams::applyCalib)
{
@@ -314,9 +363,16 @@ TransmissionData Recon::getTransmissionData(const Aurora::Matrix& aMotorPos, con
cblas_dcopy(numUsedData, transmissionBlock.metaInfos.rnBlock.getData(), 1, rnBlockTotal.getData() + numData, 1);
}
numData += numUsedData;
std::unique_lock<std::mutex> lockBufferCount(CREATE_BUFFER_MUTEX);
BLOCK_OF_TRANSIMISSIONDARA_BUFFER.erase(std::to_string(index));
--BUFFER_COUNT;
lockBufferCount.unlock();
std::cout<<"Remove: "<<index<<std::endl;
CREATE_BUFFER_CONDITION.notify_one();
}
}
}
speedUpThread.join();
double* filterData = Aurora::malloc(tofDataTotal.getDataSize());
for(int i=0;i<tofDataTotal.getDataSize();++i)