那位大哥帮忙改改,谢谢了 ( 积分: 300 )

  • 主题发起人 主题发起人 hrj28
  • 开始时间 开始时间
H

hrj28

Unregistered / Unconfirmed
GUEST, unregistred user!
线程我不会用,我是改以前同事的代码,我加了个存储过程,主要是处理数据月底更新流量或者实时更新流量的.我放的地方不对,他好象是一次执行的,所以他都处理成实时更新了.我现在也找到问题所在了,就在线程的执行里,线程好象是一次执行的.所以我想请那位大哥帮帮忙,看我的存储过程放那合适,谢谢.
//测点对应结构
PTADDRTOSTCD = ^TADDRTOSTCD;
TADDRTOSTCD = record
addr:integer;
channelno:integer;
stcd:string;
sttp:integer;//测点类型,确定入库方式,0 = 河道水位,1=水库水位,2=闸上水位,3=闸下水位
altitude:double;
//初始高程
isssqx:string;//是否实时更新 0实时更新 1 月底更新
从设备采集进来的包数据有很多通道,不同的测点根据类型不同入不同的库,然后根据该测点是月底更新流量还是实时更新流量来调用我的存储过程的.
 
unit CommFunc;
interface
uses
Windows,Classes,Messages,SysUtils,Forms,DB, ADODB,ComObj,Contnrs,DateUtils, QDialogs;
const
MAXBUFSIZE = 8192;
//读取缓冲大小
MAXPROCBUF = 200;
//写入处理缓冲数目最大值
//每个要处理的完整数据包,允许总长不超过512B,一般按照发送包计算,
//头尾16B+一行数据最大3*8 一般情况最多 40B
PKGBUFSIZE = 512;
WM_ADOERROR = WM_USER + 1;
//写入消息
type
//数据读出结构
PTDataReadBuf = ^TDataReadBuf;
TDataReadBuf = record
DataReadLen:Integer;
//写入数据长度
DataProcLen:Integer;// 保留使用
DataBuf:array[0..MAXBUFSIZE - 1] of Byte;//数据缓冲 8KB
PNextBuf:PTDataReadBuf;
//下一缓冲地址
end;
//雨量最后值结构
PTLASTVAL = ^TLASTVAL;
TLASTVAL = record
stcd:integer;
tp:integer;
lastval:double;
end;
//测点对应结构
PTADDRTOSTCD = ^TADDRTOSTCD;
TADDRTOSTCD = record
addr:integer;
channelno:integer;
stcd:string;
sttp:integer;//测点类型,确定入库方式,0 = 河道水位,1=水库水位,2=闸上水位,3=闸下水位
altitude:double;
//初始高程
isssqx:string;//是否实时更新 0实时更新 1 月底更新
end;

PTPackHead = ^TPackHead;
TPackHead = packed record //协议头部结构
wHead:WORD;
//该值应该是0xAA55
bPackLen:Byte;
bFlag:Byte;
RoutePos:Byte;
RouteTable:array[0..4] of Byte;
dwTime:DWORD;
end;

TCommThread = class;
TCommPort = Class(TObject)
public
m_bConnected:boolean;
//是否连接
m_idComDev:THandle;
//设备id
m_nBaud:integer;
//波特率
m_nDataBits:integer;
//数据位
m_nStopBits:integer;
//停止位
m_bDTRDSR:boolean;
//流控制
m_bRTSCTS:boolean;
m_bXONXOFF:boolean;
m_nParity:integer;
//校验位
m_sPort:string;
//端口
FProcfrm:TForm;
//主窗体
function TestConnection():boolean;
function OpenConnection():boolean;
procedure CloseConnection();
function WriteComm(szBuf:PChar;nLen:integer):Integer;
private
{ Private declarations }
FEventShut:Cardinal;
//关闭,写事件
FCommThread:TCommThread;//串口处理线程
function SetupConnection():boolean;
public
{ Public declarations }
constructor Create;
destructor Destroy;
override;
end;

//==监视线程===============================================
TCommThread = class(TThread)
public
//写入函数,阻塞模式,直到写入完成,返回写入字符个数
function WriteComm(pBuf:PChar;nLen:integer):integer;
private
Ffrm:TForm;
//主窗体
FEventShut:Cardinal;
FEventWrite:Cardinal;
FEventWritten:Cardinal;
FidComDev:Cardinal;
FPWriteBuf:PChar;
FnWriteLen,FnWrittenLen:Cardinal;//写入字节和已经写入字节
FPDataReadBuf:PTDataReadBuf;
//读出数据的当前指针 注意,监视线程分配的内存要主线程处理完数据后释放
procedure InitThread;
//初始化线程
procedure TerminateThread;
//结束处理
procedure ReadProc;
//数据读出处理
protected
procedure Execute;
override;
//主执行过程
published
constructor Create(frm:TForm;idComDev,EventShut:Cardinal;PriorityLevel: TThreadPriority);//优先级1~6,设定为3
destructor Destroy;
override;
end;

//=================================================
//==数据入库线程===============================================
TDataProcThread = class(TThread)
public
Ffrm:TForm;
//主窗体,用于接收处理消息
FbConnected : bool;
function WaitEndThread():boolean;
//关闭处理现成
function WriteData(pBuf:PByte;nLen:integer):boolean;//执行一个写入数据
private
FConn_Str:string;//数据库连接字符串
//FPrevCount:Integer;//信号量余数
FProcRes:Cardinal;
//同步信号量
//FPDataPtr:array[0..MAXPROCBUF - 1] of PTDataReadBuf;//定义缓冲指针数目
FDataPtr:TQueue;//使用队列对象,处理先进先出
FCs:TRTLCriticalSection;//如果使用队列,为确保安全,使用临界控制
FConn : TADOConnection;
FQue:TAdoQuery;
FStore:TADOStoredProc;
FLastVal:array of TLASTVAL;
//最后雨量值,用于计算雨量差值
FAddrToStcd:array of TADDRTOSTCD;
//终端通道对应测点表
procedure InitThread;
//初始化线程
procedure TerminateThread;
//结束处理
function ProcData():bool;
function FindLastVal(stcd:integer):integer;//查找雨量站点最后值
function FindSTCD(send_addr,channelno:integer):integer;//查找本系统通道,到其他系统的对应
protected
procedure Execute;
override;
//主执行过程
published
constructor Create(frm:TForm;conn_str:string;PriorityLevel: TThreadPriority);//优先级1~6,设定为3
destructor Destroy;
override;
end;

//=================================================
implementation
uses frm_Main;
constructor TCommPort.Create;
begin
end;
destructor TCommPort.Destroy;
begin
end;
//测试端口能否连接
function TCommPort.TestConnection():boolean;
var
fRetVal:boolean;
CommTimeOuts:TCommTimeouts;
begin
result := false;
m_idComDev := CreateFile(PChar('//./'+ m_sPort), GENERIC_READ or GENERIC_WRITE, 0, 0,
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
// or FILE_ATTRIBUTE_NORMAL or
if (m_idComDev = INVALID_HANDLE_VALUE) then
exit;
// SetFilePointer( m_idComDev,0,0,2);
// SetupComm(m_idComDev, 40960, 40960);
// PurgeComm(m_idComDev, PURGE_TXABORT or PURGE_RXABORT or PURGE_TXCLEAR or PURGE_RXCLEAR );
CommTimeOuts.ReadIntervalTimeout := 200;//$FFFFFFFF;
CommTimeOuts.ReadTotalTimeoutMultiplier := 0;
CommTimeOuts.ReadTotalTimeoutConstant := 0;
CommTimeOuts.WriteTotalTimeoutMultiplier := 0;
CommTimeOuts.WriteTotalTimeoutConstant := 5000;
fRetVal := SetCommTimeouts(m_idComDev, CommTimeOuts);
if fRetVal then
fRetVal := SetupConnection();
CloseHandle(m_idComDev);
result := fRetVal;
end;

function TCommPort.SetupConnection():boolean;
var
fRetVal:boolean;
dcb:_DCB;
ErrNum:Cardinal;
begin
dcb.DCBlength := sizeof(DCB);
fRetVal := GetCommState(m_idComDev, dcb);
// fRetVal := SetCommState(m_idComDev, dcb);
dcb.BaudRate := m_nBaud;
dcb.ByteSize := m_nDataBits;
case m_nParity of
0: dcb.Parity := NOPARITY;
1: dcb.Parity := ODDPARITY;
2: dcb.Parity := EVENPARITY;
3: dcb.Parity := MARKPARITY;
4: dcb.Parity := SPACEPARITY;
else
{ASSERT(FALSE) error};
end;

case m_nStopBits of
0: dcb.StopBits := ONESTOPBIT;
1: dcb.StopBits := ONE5STOPBITS;
2: dcb.StopBits := TWOSTOPBITS;
else
{ASSERT(FALSE) error};
end;

//dcb.fBinary = TRUE;
dcb.Flags := dcb.Flags or 1;
// else
dcb.Flags := dcb.Flags and $FFFFFFFE;
//--dcb.fParity := TRUE;
dcb.Flags := dcb.Flags or 2;
// else
dcb.Flags := dcb.Flags and $FFFFFFFD;
if m_bRTSCTS then
dcb.Flags := dcb.Flags or 4
else
dcb.Flags := dcb.Flags and $FFFFFFFB;

if m_bDTRDSR then
dcb.Flags := dcb.Flags or 8
else
dcb.Flags := dcb.Flags and $FFFFFFF7;
if m_bDTRDSR then
//DTR_CONTROL_HANDSHAKE
dcb.Flags := dcb.Flags or $20
else
// DTR_CONTROL_ENABLE
dcb.Flags := dcb.Flags or $10;
//else
dcb.Flags := dcb.Flags and $FFFFFFCF;
DTR_CONTROL_DISABLE
if m_bRTSCTS then
dcb.Flags := dcb.Flags or $2000 //RTS_CONTROL_HANDSHAKE
else
dcb.Flags := dcb.Flags or $1000;
//RTS_CONTROL_ENABLE
//else
dcb.Flags := dcb.Flags and $FFFFCFFF;
RTS_CONTROL_DISABLE
if m_bXONXOFF then
dcb.Flags := dcb.Flags or $100
else
dcb.Flags := dcb.Flags and $FFFFFEFF;
//dcb.fInX := dcb.fOutX := m_bXONXOFF;
if m_bXONXOFF then
dcb.Flags := dcb.Flags or $200
else
dcb.Flags := dcb.Flags and $FFFFFDFF;
dcb.XonChar := #$11;//ASCII_XON;
dcb.XoffChar := #$13;//ASCII_XOFF;
dcb.XonLim := 100;
dcb.XoffLim := 100;
result := SetCommState(m_idComDev, dcb);
{
ErrNum := GetLastError();
if ErrNum < 0 then
OutPutDebugString('aa')
else
OutPutDebugString('bb');
}
end;

function TCommPort.OpenConnection():boolean;
var
fRetVal:boolean;
CommTimeOuts:TCommTimeouts;
begin
result := false;
if m_bConnected then
exit;
//如果使用Com1的字符串,则不能打开com11等设备,返回找不到设备错误
m_idComDev := CreateFile(PChar('//./'+ m_sPort), GENERIC_READ or GENERIC_WRITE, 0, 0,
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
// or FILE_ATTRIBUTE_NORMAL or
if (m_idComDev = INVALID_HANDLE_VALUE) then
begin
m_idComDev := GetLastError();
exit;
end;
SetFilePointer( m_idComDev,0,0,2);

SetupComm(m_idComDev, 40960, 40960);
//接收缓存4K
PurgeComm(m_idComDev, PURGE_TXABORT or PURGE_RXABORT or PURGE_TXCLEAR or PURGE_RXCLEAR );
GetCommTimeouts( m_idComDev, CommTimeOuts );
CommTimeOuts.ReadIntervalTimeout := 200;//$FFFFFFFF;
//这些设置是不是只对ReadFile,WriteFile,起作用?
CommTimeOuts.ReadTotalTimeoutMultiplier := 0;
CommTimeOuts.ReadTotalTimeoutConstant := 0;
CommTimeOuts.WriteTotalTimeoutMultiplier := 0;
//注意:1200bps时5秒最多传输602字节
CommTimeOuts.WriteTotalTimeoutConstant := 5000;
//该值决定串口一次写入的最长时间,对于第速率时,可能没有全部写完数据。或者串口有问题时,也常些不完。
fRetVal := SetCommTimeouts(m_idComDev, CommTimeOuts);

if fRetVal then
fRetVal := SetupConnection();
EscapeCommFunction(m_idComDev, CLRRTS);
if (fRetVal) then
begin
FEventShut := CreateEvent(nil,true,false,nil);
//建立读写线程
FCommThread := TCommThread.Create(FProcfrm,m_idComDev,FEventShut,tpNormal);
FCommThread.Resume;
m_bConnected := TRUE;
end
else
begin
m_bConnected := FALSE;
CloseHandle(m_idComDev);
end;
result := fRetVal;
end;
procedure TCommPort.CloseConnection();
begin
if not m_bConnected then
exit;
FCommThread.Terminate();
SetEvent(FEventShut);
FCommThread.WaitFor;
CloseHandle(FEventShut);
//EscapeCommFunction(m_idComDev, CLRDTR);
SetCommMask(m_idComDev,0);
PurgeComm(m_idComDev, PURGE_TXABORT or PURGE_RXABORT or PURGE_TXCLEAR or PURGE_RXCLEAR);
CloseHandle(m_idComDev);
end;

function TCommPort.WriteComm(szBuf:PChar;nLen:integer):Integer;
begin
if nLen <= 0 then
exit;
result := FCommThread.WriteComm(szBuf,nLen);
end;

//--thread code-----------------------------------------------
constructor TCommThread.Create(frm:TForm;idComDev,EventShut:Cardinal;PriorityLevel:TThreadPriority);
begin
inherited Create(true);
// Create thread suspended
Priority := PriorityLevel;
// Set Priority Level tpNormal is ok
FreeOnTerminate := true;
// Thread Free Itself when terminated
Ffrm := frm;
FidComDev := idComDev;
FEventShut := EventShut;
InitThread;//Synchronize();
// Setup the ProgressBar
//Suspended := false;
// Continue the thread
end;
destructor TCommThread.Destroy;
begin
TerminateThread;
// inherited destroy;
end;
procedure TCommThread.InitThread;
// setup/initialize the ProgressBar
begin
FEventWritten := CreateEvent(nil,true,false,nil);
FEventWrite := CreateEvent(nil,true,false,nil);
FPDataReadBuf := nil;
end;
procedure TCommThread.TerminateThread;
//结束处理
begin
CloseHandle(FEventWritten);
CloseHandle(FEventWrite);
end;
function TCommThread.WriteComm(pBuf:PChar;nLen:integer):integer;
begin
result := -1;
if Terminated then
exit;
FPWriteBuf := pBuf;
FnWriteLen := nLen;
SetEvent(FEventWrite);
WaitForSingleObject(FEventWritten,INFINITE);
Sleep(50);
ReSetEvent(FEventWritten);
result := FnWriteLen;
end;
procedure TCommThread.Execute;
// Main execution for thread
var
osr,osw:OVERLAPPED;
aEventAry:TWOHandleArray;
retval:Cardinal;
ComStat:TCOMSTAT;
dwErrorFlags, dwLength:DWORD;
fReadStat:LongBool;
dwEvtMask:Cardinal;
begin
// if Terminated is true, this loop exits prematurely so the thread will terminate
if (not SetCommMask(FidComDev, EV_RXCHAR or EV_RXFLAG or EV_TXEMPTY)) then
begin
Messagebox(0,'设置Comm失败!','错误',MB_OK or MB_ICONERROR);
exit;
end;
FillMemory(@osr, sizeof(OVERLAPPED), 0);
FillMemory(@osw, sizeof(OVERLAPPED), 0);
osr.hEvent := CreateEvent(0, TRUE, FALSE, 0);
// 手工、无信号
if (osr.hEvent = 0) then
begin
Messagebox(0,'不能创建事件对象!','错误',MB_OK or MB_ICONERROR);
exit;
end;
aEventAry[0] := osr.hEvent;
aEventAry[1] := FEventWrite;
aEventAry[2] := FEventShut;
while Terminated = falsedo
begin
SetCommMask(FidComDev, EV_RXCHAR or EV_RXFLAG {or EV_TXEMPTY});
if not WaitCommEvent(FidComDev,dwEvtMask,@osr) then
begin
If (GetLastError() = ERROR_IO_PENDING) then
begin
retval := WaitForMultipleObjects(3,@aEventAry,false,INFINITE);
If retval <> WAIT_FAILED then
begin
retval := retval - WAIT_OBJECT_0;
case retval of
0://读取处理
begin
ReadProc;
end;
1://写入处理
begin
FnWrittenLen := 0;
FillMemory(@osw, sizeof(OVERLAPPED), 0);
osw.hEvent := FEventWritten;
fReadStat := ClearCommError(FidComDev, dwErrorFlags, @ComStat);
EscapeCommFunction(FidComDev, SETRTS);//打开关闭发射机
Sleep(200);
WriteFile(FidComDev,FPWriteBuf[0],FnWriteLen,FnWrittenLen,@osw);
Longbool(retval) := GetOverlappedResult( FidComDev, osw ,FnWrittenLen,true);
FnWrittenLen := FnWriteLen;
ReSetEvent(FEventWrite);
Sleep(100);
EscapeCommFunction(FidComDev, CLRRTS);
If FnWrittenLen < FnWriteLen then
retval := GetLastError();
//OutPutDebugString('Comm Write Error');
end;
2:
end;
end;
end;
end;
end;
CloseHandle(osr.hEvent);
Sleep(50);
//延时以保证后续代码不发生错误
end;
//读出处理
procedure TCommThread.ReadProc;
var
fReadStat:LongBool;
ComStat:TCOMSTAT;
dwErrorFlags, dwLength:DWORD;
os:OVERLAPPED;
begin
//分配第一个数据缓冲
//注意,内存在线程分配,通过消息传递到主线程,需要住线程来释放内存
fReadStat := ClearCommError(FidComDev, dwErrorFlags, @ComStat);
if ComStat.cbInQue = 0 then
exit;
if FPDataReadBuf = nil then
FPDataReadBuf := AllocMem(sizeof(TDataReadBuf));//此函数初始化分配的内存为0,释放使用FreeMem
if (MAXBUFSIZE - FPDataReadBuf.DataReadLen) < ComStat.cbInQue then
begin
FPDataReadBuf.PNextBuf := AllocMem(sizeof(TDataReadBuf));
FPDataReadBuf := FPDataReadBuf.PNextBuf;
end;

FillMemory(@os, sizeof(OVERLAPPED), 0);
os.hEvent := CreateEvent(nil, TRUE, FALSE, nil);
if not ReadFile(FidComDev,FPDataReadBuf.DataBuf[FPDataReadBuf.DataReadLen],
ComStat.cbInQue,dwLength,@os) then
begin
if GetLastError() = ERROR_IO_PENDING then
begin
if not GetOverlappedResult(FidComDev,os,dwLength,true) then
dwLength := 0;//如果等待失败,则设置读出长度为0
end;
end;
if dwLength > 0 then
begin
FPDataReadBuf.DataReadLen := FPDataReadBuf.DataReadLen + dwLength;
PostMessage(Ffrm.Handle,WM_COMMNOTIFY,Integer(FPDataReadBuf),dwLength);
end;
end;
//--thread code end-------------------------------------------
//读线程读出-〉主线程分析-〉写线程写库
//--write thread code-----------------------------------------------
constructor TDataProcThread.Create(frm:TForm;conn_str:string;PriorityLevel:TThreadPriority);
begin
FbConnected := false;
FProcRes := CreateSemaphore(nil,0,MAXPROCBUF + 1,'_CJSJ_');
if FProcRes = 0 then
begin
Application.MessageBox('无法创建对象!','错误',MB_ICONERROR);
exit;
end;
//FillMemory(@FPDataPtr[0],MAXPROCBUF*sizeof(Pointer),0);
FDataPtr := TQueue.Create();
InitializeCriticalSection(FCs);
Ffrm := frm;
FConn_Str := conn_str;
FConn := nil;
InitThread;//Synchronize();
// Setup the ProgressBar
//不管是否成功,线程必须建立,否则后续操作无法完成
inherited Create(true);
// Create thread suspended
if FbConnected then
//仅当连接成功才建立线程
begin
Priority := PriorityLevel;
// Set Priority Level tpNormal is ok
FreeOnTerminate := true;
// 线程结束后(waitfor返回)后自动释放线程对象,不用显式释放
//FreeOnTerminate:=false;
end;
end;
destructor TDataProcThread.Destroy;
begin
TerminateThread;
// inherited destroy;
end;
function TDataProcThread.WaitEndThread():boolean;
var i,cnt:integer;
begin
if not FbConnected then
exit;
//释放多个信号,同时置标志为结束
Terminate();
//确定有多少没处理的 全部释放掉
//for i := 0 to MAXPROCBUF - 1do
// if FPDataPtr <> nil then
Inc(cnt);
cnt := FDataPtr.Count + 1;
ReleaseSemaphore(FProcRes,cnt,0);
WaitFor();
//如果线程释放太快,则这个waitfor会报句炳无效错误
//最好在线程结束最后增加sleep来延时,以便可以执行到WaitFor
end;
procedure TDataProcThread.InitThread;
// setup/initialize the ProgressBar
var i:integer;
Que:TADOQuery;
begin
try
FConn := TADOConnection.Create(nil);
FConn.CommandTimeout := 0;
FConn.CursorLocation := clUseClient;
FConn.ConnectionString := FConn_Str;
FConn.Open;
FQue := TADOQuery.Create(nil);
FQue.Connection := FConn;
FStore:= TADOStoredProc.Create(nil);
FStore.Connection := FConn;
SetLength(FLastVal,0);
//最好给个0值,以免出错
//测点对应表
SetLength(FAddrToStcd,0);
FQue.SQL.Text := 'select * from AddrToStcd';
FQue.Open;
if FQue.RecordCount > 0 then
begin
SetLength(FAddrToStcd,FQue.RecordCount);
for i := 0 to FQue.RecordCount - 1do
begin
FAddrToStcd.addr := FQue.FieldByName('addr').Value * 100 + FQue.FieldByName('channelno').Value;
FAddrToStcd.channelno := FQue.FieldByName('channelno').Value;
FAddrToStcd.stcd := FQue.FieldByName('stcd').AsString;
FAddrToStcd.sttp := FQue.FieldByName('sttp').Value;
if not FQue.FieldByName('altitude').IsNull then
FAddrToStcd.altitude := FQue.FieldByName('altitude').Value
else
FAddrToStcd.altitude := 0;
FAddrToStcd.isssqx:=FQue.FieldByName('isssqx').AsString;
FQue.Next;
end;
end;
FQue.Close;

FbConnected := true;
except
on E:EOleExceptiondo
begin
Application.MessageBox(PChar(e.Message),' 连接错误',MB_ICONERROR);
exit;
end;
on E:EADOErrordo
begin
FConn.Free;
Application.MessageBox(PChar(E.Message),'连接错误',MB_ICONERROR);
end;
on E: Exceptiondo
begin
FConn.Free;
Application.MessageBox(PChar(E.Message),'连接错误',MB_ICONERROR);
end;
end;
end;
procedure TDataProcThread.TerminateThread;
//结束处理
begin
DeleteCriticalSection(FCs);
if FConn <> nil then
begin
if FConn.Connected then
FConn.Close;
FConn.Free;
//if FQue.State =
if FConn <> nil then
FQue.Free;
end;
FbConnected := false;
if FProcRes <> 0 then
CloseHandle(FProcRes);
//结束前最好确保队列是空的...
if FDataPtr <> nil then
FDataPtr.Free;
end;
function TDataProcThread.WriteData(pBuf:PByte;nLen:integer):boolean;
var i:integer;
begin
result := false;
//注意:如果线程发生没有被handle的异常而退出,则下面的判断无法检测
if Terminated then
exit;
//如果线程没有运行或者正在停止过程中,则不在处理
//注意,要保证先写入的先处理所以这里要采用追加到最后的方式。
if FDataPtr.Count < MAXPROCBUF then
begin
EnterCriticalSection(FCs);
FDataPtr.Push(Pointer(pBuf));
LeaveCriticalSection(FCs);
ReleaseSemaphore(FProcRes,1,0);
result := true;
end;

//如果缓冲不够之后怎么处理呢?不过这种情况可能不会出现
//暂时返回false,丢弃该数据
end;
procedure TDataProcThread.Execute;
// Main execution for thread
var
bProcOK:bool;
begin
bProcOK := true;
InitializeCriticalSection(FCs);
while Terminated = falsedo
begin
bProcOK := true;
while bProcOKdo
//增加一个内部循环,直到没有数据处理了才推出。
begin
WaitForSingleObject(FProcRes,INFINITE);
bProcOK := ProcData;
end;
end;
Sleep(50);//延时以保证后续代码不发生错误
end;

//时间转换
function GetDateTime(dtSecond:LongWord):TDateTime;
var dt:TDateTime;
begin
dt := EncodeDateTime(1980,01,01,00,00,00,00);
result := IncSecond(dt,dtSecond);
end;

//如果当前有数据处理,返回真,如果无数据处理返回假
function TDataProcThread.ProcData():bool;
var
SqlStr,tmpstr:string;
pData:PTPackHead;
pMsgBuf:PChar;
pPtr:PByteArray;
send_addr:Byte;
i,lastval,stcd:integer;
wdVal:WORD;
dwdVal:DWORD;
LJRUN:INTEGER;
showstcd:string;
const TbNameAry:array[0..4] of string =
('insert ST_RIVER_R(STCD,TM,Z) values(:STCD,:TM,:Z)',
'insert ST_RSVR_R(STCD,TM,RZ) values(:STCD,:TM,:RZ)',
'insert ST_WAS_R(STCD,TM,UPZ) values(:STCD,:TM,:UPZ)',
'insert ST_WAS_R(STCD,TM,DWZ) values(:STCD,:TM,:DWZ)',
'insert RTRUN (STCD,DA,DI,TP) values(:STCD,:DA,:DI,:TP)');
//HE
// pPtr:PByte;
begin
showstcd:='';
result := false;
EnterCriticalSection(FCs);
if FDataPtr.Count = 0 then
begin
LeaveCriticalSection(FCs);
exit;
end;
pData := PTPackHead(FDataPtr.Pop);
LeaveCriticalSection(FCs);
//数据入库处理
try
send_addr := pData.RouteTable[0];
pPtr := @(PByteArray(pData)[14]);
/////////////////////////
for i := 0 to ((pData.bPackLen - 16) div 3) - 1do
//单独计算19号虚拟通道的累计值
begin
if pPtr[3*i]=19 then
begin
LJRUN := PWORD(@pPtr[3*i + 1])^ ;
end;
end;
////////////////////////////
for i := 0 to ((pData.bPackLen - 16) div 3) - 1do
//3个字节的整数倍,多出来的不要了
begin
//如果是雨量数据,则是最后一项,并且需要做特殊处理 ,然后退出循环
if pPtr[3*i]=19 then
break;
//虚拟的通道,该值为终端雨量累加器的后16位。 2005。5。21修改
stcd := FindSTCD(send_addr,pPtr[3*i]);
if stcd = -1 then
begin
//没有对应的测点
continue;
end;

wdVal := PWORD(@pPtr[3*i + 1])^;//PWORD(DWORD(pData) + 15)^;
//增加一个对无效值判断,负的值不入库
if (wdVal and $8000) <> 0 then
continue;
//注:阿拉尔不是用电压和雨量
if (pPtr[3*i] = 1) then
begin
continue;
FQue.Parameters[0].Value := stcd;//send_addr * 32 + 221;
//&quot;站点号*32 + 227 = 电压通道
FQue.Parameters[1].Value := wdVal / 1000.0;
//电压是毫伏。
FQue.Parameters[2].Value := 0;
FQue.Parameters[3].Value := 99;
end;
if (pPtr[3*i] = 18) then
begin
// continue;
//雨量占18,19两个通道,前者为差值,后者为累加器长整型的后16位。
//lastval := FindLastVal(stcd);
if (wdVal = 0) then
break;
//0值不入库
FQue.SQL.Clear;
FQue.SQL.Text:=TbNameAry[FAddrToStcd[stcd].sttp];
FQue.Parameters[0].Value := FAddrToStcd[stcd].stcd;//send_addr * 32 + 243;//&quot;站点号*32 + 243
FQue.Parameters[1].Value :=NOW();
FQue.Parameters[2].Value := wdVal*FAddrToStcd[stcd].altitude;
FQue.Parameters[3].Value :=LJRUN*FAddrToStcd[stcd].altitude;
showstcd:=showstcd+','+FAddrToStcd[stcd].stcd;
end
else
begin
FQue.SQL.Clear;
FQue.SQL.Text := TbNameAry[FAddrToStcd[stcd].sttp];
FQue.Parameters[0].Value := FAddrToStcd[stcd].stcd;//send_addr * 32 + pPtr[3*i] + 225;
//&quot;站点号*32 + 225 = 电压通道
FQue.Parameters[1].Value := Now();
FQue.Parameters[2].Value := FAddrToStcd[stcd].altitude + wdVal / 1000.0;
showstcd:=showstcd+FAddrToStcd[stcd].stcd;
if FAddrToStcd[stcd].isssqx='1' then
begin
//月底更新
FStore.ProcedureName:='UPDATEQ2';
FStore.Prepared;
FStore.Parameters.Clear;
FStore.Parameters.CreateParameter('style',ftString,pdUnknown,1,'1');
Try
FStore.ExecProc;
except
ShowMessage('执行存储过程异常');
end;
end
else
//实时更新
begin
FStore.ProcedureName:='UPDATEQ2';
FStore.Prepared;
FStore.Parameters.Clear;
FStore.Parameters.CreateParameter('style',ftString,pdUnknown,1,'0');
Try
FStore.ExecProc;
except
ShowMessage('执行存储过程异常');
end;
end;
end;

if FQue.ExecSQL = 0 then
begin
//没有执行成功
tmpstr := '错误:测点[' + IntToStr(send_addr*100 + pPtr[3*i]) + ']数据写入没有成功!';
pMsgBuf := AllocMem(length(tmpstr)+1);
CopyMemory(pMsgBuf,PChar(tmpstr),length(tmpstr));
PostMessage(Ffrm.Handle,WM_ADOERROR,Cardinal(pMsgBuf),0);
end;

end;
frm_main.stcdno:=showstcd;
//////////////////////////////////////////////
{ FQue.SQL.Text := 'insert A11Czzk1(SurveyPointNo,SurveyTime,MiddleValue1,SurveyFlag) values(:SurveyPointNo,:SurveyTime,:MiddleValue1,0)';
send_addr := pData.RouteTable[0];
pPtr := @(PByteArray(pData)[14]);
for i := 0 to ((pData.bPackLen - 16) div 3) - 1do
//3个字节的整数倍,多出来的不要了
begin
FQue.Parameters[0].Value := IntToStr(send_addr) + '-' + IntToStr(pPtr[3*i]);//&quot;站点号-通道号码&quot;格式
FQue.Parameters[1].Value := GetDateTime(pData.dwTime);
// pPtr := PByte(DWORD(pData) + 14);
FQue.Parameters[2].Value := PWORD(@pPtr[3*i + 1])^;//PWORD(DWORD(pData) + 15)^;
FQue.ExecSQL;
// if FQue.ExecSQL > 0 then
// begin
//sql执行成功
// end;

end;
}
////////////////////////////////////////////////

//注意要在写入后释放内存,(主线程读取数据后分配内存,这里释放)
FreeMem(pData,PKGBUFSIZE);
except
on E:EOleExceptiondo
begin
pMsgBuf := AllocMem(length(E.Message)+1);//必须多分配一个,用作空结尾符,
//否则在字符串长度为4的整数倍时会出错,因为内存分配实际是按照4字节对齐的
CopyMemory(pMsgBuf,PChar(E.Message),length(E.Message));
PostMessage(Ffrm.Handle,WM_ADOERROR,Cardinal(pMsgBuf),0);
FreeMem(pData,PKGBUFSIZE);
//注意错误之后也要释放内存,
end;
on E:EADOErrordo
begin
pMsgBuf := AllocMem(length(E.Message)+1);
CopyMemory(pMsgBuf,PChar(E.Message),length(E.Message));
PostMessage(Ffrm.Handle,WM_ADOERROR,Cardinal(pMsgBuf),0);
FreeMem(pData,PKGBUFSIZE);
//注意错误之后也要释放内存,
end;
on E: Exceptiondo
begin
pMsgBuf := AllocMem(length(E.Message)+1);
CopyMemory(pMsgBuf,PChar(E.Message),length(E.Message));
PostMessage(Ffrm.Handle,WM_ADOERROR,Cardinal(pMsgBuf),0);
FreeMem(pData,PKGBUFSIZE);
//注意错误之后也要释放内存,
end;
end;
result := true;
end;

function TDataProcThread.FindLastVal(stcd:integer):integer;
var i:integer;
begin
for i := Low(FlastVal) to High(FlastVal)do
begin
if FlastVal.stcd = stcd then
begin
result := i;
exit;
end;
end;
result := -1;
end;

function TDataProcThread.FindSTCD(send_addr,channelno:integer):integer;
var i:integer;
begin
result := send_addr * 100 + channelno;
for i := Low(FAddrToStcd) to High(FAddrToStcd)do
begin
if FAddrToStcd.addr = result then
begin
result := i;//FAddrToStcd.stcd;
exit;
end;
end;
result := -1;
end;
//--write thread code end-------------------------------------------
end.
 
[?]不会吧?兄弟,拿这么长的代码让人阅读……
 
不懂
這麼多
 

Similar threads

D
回复
0
查看
2K
DelphiTeacher的专栏
D
D
回复
0
查看
1K
DelphiTeacher的专栏
D
S
回复
0
查看
1K
SUNSTONE的Delphi笔记
S
S
回复
0
查看
911
SUNSTONE的Delphi笔记
S
D
回复
0
查看
1K
DelphiTeacher的专栏
D
后退
顶部