不完整, 仅供参考...
procedure TMessageQueue.Get(aData: TStream; var aHeader: TMessageHeader; AutoCommit: Boolean=True);
var
DeathTime : TDateTime;
begin
Assert(Assigned(aData));
FDatabase.StartTransaction;
try
//根据检索条件检索数据
with qrySelectByDestination, aHeader do
begin
Close;
Params[0].AsString := FPattern; //Destination
Open;
if IsEmpty then raise EQueueEmpty.Create('Queue is Empty');
No := Fields[0].AsInteger; //No
Source := Fields[1].AsString; //Source
Destination := Fields[2].AsString; //Destination
Priority := Fields[3].AsInteger; //Priority
DeathTime := Fields[4].AsDateTime; //DeathTime
TimeToLive := Trunc((DeathTime-Now)*24*3600);
aData.Seek(0, soFromBeginning);
TBlobField(Fields[5]).SaveToStream(aData); //DataField
Close;
end;
if AutoCommit then
begin
//自动提交, 删除已经读取的记录
with qryDeleteByNo do
begin
Params[0].AsInteger := aHeader.No; //No
ExecSQL;
end;
end
else
begin
//非自动提交, 将记录状态更新为读未提交
with qryUpdateByNo do
begin
Params[0].AsString := ReadNotCommit; //State
Params[1].AsInteger := FSession; //Session
Params[2].AsInteger := aHeader.No; //No
ExecSQL;
end;
end;
FDatabase.Commit;
except
on E: Exception do
begin
if not (E is EQueueEmpty) then TLogFile.WriteLn('读取数据失败. 错误信息=%s ', [E.Message]);
FDatabase.Rollback;
raise;
end;
end;
end;
procedure TMessageQueue.Put(aData: TStream; aHeader: TMessageHeader; AutoCommit: Boolean=True);
var
States : Char;
Session: Cardinal;
begin
Assert(Assigned(aData));
FDatabase.StartTransaction;
try
if AutoCommit then
begin
//自动提价, 记录状态为已提交
States := Commited;
Session := 0;
end
else
begin
//非自动提交, 记录状态为写未提交
States := WriteNotCommit;
Session := FSession;
end;
qrySelectMaxN
pen;
aHeader.No := qrySelectMaxNo.Fields[0].AsInteger;
Inc(aHeader.No);
qrySelectMaxNo.Close;
//将数据插入磁盘队列
with qryInsert, aHeader do
begin
Params[0].AsInteger := No;
Params[1].AsInteger := Session; //Session
Params[2].AsString := Source; //Source
Params[3].AsString := Destination; //Destination
Params[4].AsSmallInt:= Priority; //Priority
Params[5].AsDateTime:= IncSecond(Now, TimeToLive);//DeathTime
Params[6].AsString := States; //State
aData.Seek(0, soFrombeginning);
Params[7].LoadFromStream(aData, ftBlob); //DataField
ExecSQL;
end;
FDatabase.Commit;
except
on E: Exception do
begin
TLogFile.WriteLn('写数据失败. 错误信息=%s ', [E.Message]);
FDatabase.Rollback;
raise;
end;
end;
end;