//三层应用程序中的回调功能
//还有待完善,但已基本可用
unit uRDM;
{$WARN SYMBOL_PLATFORM OFF}
interface
uses
Windows, Messages, SysUtils, Classes, ComServ, ComObj, VCLCom, DataBkr,
DBClient, cpt_TLB, StdVcl, AxCtrls, ActiveX, uEventSink, Variants, Dialogs;
const
WM_SetCallBack = WM_User + $10;
WM_BroadCast = WM_User + $20;
type
POleVariant = ^OleVariant;
tagCltData = record
pSink: POleVariant;
id: TGUID; //可以保证在客户端异常断开后能正常清理FSinkList的内容
index: integer;
end;
TCltData = tagCltData;
PCltData = ^TCltData;
tagCallBackData = record
pIStm: Pointer; //IStream
pParam: Pointer; //指向参数的指针
end;
TCBkData = tagCallBackData;
PCBkData = ^TCBkData;
TCPCObj = class(TAutoObject, ICPC)
private
FSinkList: TList;
FMtx: Cardinal;
protected
procedure WMSetCallBack(var message: TMsg);
procedure WMBroadCast(var message: TMsg);
procedure SetCallBack(EventSink: OleVariant; var index: integer; id: TGUID); safecall;
procedure Trigger;
procedure CancelCallBack(const id: TGUID); overload;
procedure CancelCallBack(const index: integer); overload;
procedure Lock;
procedure UnLock;
public
constructor Create;
destructor Destroy; override;
end;
TCPTest = class(TRemoteDataModule, ICPSrv)
procedure RemoteDataModuleDestroy(Sender: TObject);
procedure RemoteDataModuleCreate(Sender: TObject);
private
{ Private declarations }
FCPC: TCPCObj;
FID: TGUID;
protected
class procedure UpdateRegistry(Register: Boolean; const ClassID, ProgID: string); override;
procedure MakeCall; safecall;
procedure SetCallBack(EventSink: OleVariant; var Index: Integer);
safecall;
public
{ Public declarations }
constructor Create(AOwner: TComponent); override;
end;
function ThrdFunc(P: Pointer): longint; stdcall;
function CallBack(P: Pointer): longint; stdcall;
var
g_CPC: TCPCObj;
g_Thrd: THandle;
g_ThrdID: Cardinal;
implementation
{$R *.DFM}
function CallBack(P: Pointer): longint;
var
data: PCBkData;
obj: IDispatch;
begin
CoInitialize(nil);
try
data := PCBkData(P);
OleCheck(CoGetInterfaceAndReleaseStream(
IStream(data.pIStm), IDispatch, obj)
);
OleVariant(obj).Done;
finally
CoUnInitialize;
end;
end;
function ThrdFunc(P: Pointer): longint;
var
lpMsg: TMsg;
begin
CoInitialize(nil);
try
g_CPC := TCPCObj.Create;
while (GetMessage(lpMsg, 0, 0, 0)) do
begin
case lpMsg.message of
WM_SetCallBack: g_CPC.WMSetCallBack(lpMsg);
WM_BroadCast: g_CPC.WMBroadCast(lpMsg);
end;
// DispatchMessage(lpMsg);
end;
FreeAndNil(g_CPC);
finally
CoUninitialize;
end;
end;
constructor TCPTest.Create(AOwner: TComponent);
begin
inherited;
FCPC := g_CPC;
end;
class procedure TCPTest.UpdateRegistry(Register: Boolean; const ClassID, ProgID: string);
begin
if Register then
begin
inherited UpdateRegistry(Register, ClassID, ProgID);
EnableSocketTransport(ClassID);
EnableWebTransport(ClassID);
end else
begin
DisableSocketTransport(ClassID);
DisableWebTransport(ClassID);
inherited UpdateRegistry(Register, ClassID, ProgID);
end;
end;
procedure TCPTest.MakeCall;
begin
// FCPC.Trigger;
PostThreadMessage(g_ThrdID, WM_BroadCast, 0, 0);
end;
procedure TCPTest.SetCallBack(EventSink: OleVariant; var Index: Integer);
var
p: Pointer;
sink: IDispatch;
begin
// FCPC.SetCallBack(EventSink, Index, FID);
p := nil; //局域变量,必须初始化
sink := IUnknown(EventSink) as IDispatch;
OleCheck(CoMarshalInterThreadInterfaceInStream(
IDispatch,
sink,
IStream(p))
);
Index := Integer(Pointer(PGUID(@FID)));
//因为是邮寄,返回的Index无效
PostThreadMessage(g_ThrdID, WM_SetCallBack, Integer(p), Index);
end;
{ TCPCObj }
procedure TCPCObj.CancelCallBack(const id: TGUID);
var
I: integer;
pData: PCltData;
s1, s2: string;
begin
for I := 0 to FSinkList.Count - 1 do
begin
if (FSinkList = nil) then Continue;
pData := PCltData(FSinkList);
s1 := GUIDToString(pData^.id);
s2 := GUIDToString(id);
if (CompareText(s1, s2) = 0) then //应该有更好的方法
begin
CancelCallBack(I);
break;
end;
end;
end;
procedure TCPCObj.CancelCallBack(const index: integer);
var
pData: PCltData;
begin
Lock;
if (index <= -1) or (index >= FSinkList.Count) or (FSinkList[index] = nil) then exit;
pData := PCltData(FSinkList[index]);
// pData^.pSink^._Release; 如果客户已经断开了则不用
Dispose(pData^.pSink);
Dispose(pData);
FSinkList[index] := nil;
if (index = FSinkList.Count - 1) then
FSinkList.Delete(index);
UnLock;
end;
constructor TCPCObj.Create;
begin
inherited;
FSinkList := TList.Create;
FMtx := CreateMutex(nil, false, nil);
end;
destructor TCPCObj.Destroy;
var
I: integer;
begin
for I := 0 to FSinkList.Count - 1 do
begin
CancelCallBack(I);
end;
FreeAndNil(FSinkList);
CloseHandle(FMtx);
inherited;
end;
procedure TCPCObj.Lock;
begin
WaitForSingleObject(FMtx, INFINITE);
end;
procedure TCPCObj.SetCallBack(EventSink: OleVariant; var index: integer; id: TGUID);
var
Sink: ICPClt;
I: integer;
pData: PCltData;
// pSink: POleVariant;
begin
Lock;
I := 0;
// New(pSink);
New(pData);
New(pData^.pSink);
pData^.pSink^ := EventSink;
pData^.id := id;
Sink := ICPClt(IDispatch(EventSink));
while I < FSinkList.Count do
begin
if FSinkList = nil then
Break
else
Inc(I);
end;
pData^.index := I;
if I >= FSinkList.Count then
begin
FSInkList.Add(Pointer(pData));
//FSinkList.Add(Pointer(pSink))
end else
begin
FSInkList.Add(Pointer(pData));
//FSinkList := Pointer(pSink);
end;
Sink._AddRef; //可有可无, 不是很明白
Index := I;
UnLock;
end;
procedure TCPCObj.Trigger;
var
p: Pointer;
EventSink: POleVariant;
I: integer;
PData: PCBkData;
thrdID: Cardinal;
Sink: IDispatch;
begin
for I := 0 to FSinkList.Count - 1 do
begin
if (FSinkList = nil) then Continue;
p := FSinkList;
EventSink := PCltData(p).pSink;
// EventSink^.Done; //客户端实现的接口
New(PData);
sink := IUnknown(EventSink^) as IDispatch;
PData^.pIStm := nil;
OleCheck(CoMarshalInterThreadInterfaceInStream(
IDispatch,
Sink,
IStream(PData^.pIStm))
);
// PData := PCBkData(@CBData);
CreateThread(nil, 0, @CallBack, PData, 0, thrdID);
end;
end;
procedure TCPTest.RemoteDataModuleDestroy(Sender: TObject);
begin
FCPC.CancelCallBack(FID);
end;
procedure TCPTest.RemoteDataModuleCreate(Sender: TObject);
begin
CreateGUID(FID);
end;
procedure TCPCObj.UnLock;
begin
ReleaseMutex(FMtx);
end;
procedure TCPCObj.WMBroadCast(var message: TMsg);
begin
Trigger;
end;
procedure TCPCObj.WMSETCALLBACK(var message: TMsg);
var
p: Pointer;
obj: IDispatch;
index: integer;
id: TGUID;
begin
p := Pointer(message.wParam);
OleCheck(CoGetInterfaceAndReleaseStream(
IStream(p), IDispatch, obj)
);
id := PGUID(Pointer(message.lParam))^;
SetCallBack(obj, index, id);
message.lParam := index;
end;
initialization
TComponentFactory.Create(ComServer, TCPTest,
Class_CPTest, ciMultiInstance, tmApartment);
TAutoObjectFactory.Create(ComServer, TCPCObj, Class_CPC,
ciSingleInstance, tmApartment);
// g_CPC := TCPCObj.Create;
g_Thrd := CreateThread(nil, 0, @ThrdFunc, nil, 0, g_ThrdID);
finalization
PostThreadMessage(g_ThrdID, WM_Quit, 0, 0);
end.