提高分布式多层系统的效率之二(Pooler)

  • 主题发起人 主题发起人 import
  • 开始时间 开始时间
I

import

Unregistered / Unconfirmed
GUEST, unregistred user!
提高分布式多层系统的效率之二(Pooler) 原创 (小小->爱被下载中……)
提高分布式多层系统的效率是一个永远不变的主题,有很多制约分布式系统的效率,如:RDBMS、数据库引擎、代码效率、各种Pooling、开发设计等……
在此,我就Pooler说一说,希望对大家有用;
有那些Pooler?Pooler是什么?那儿可以用Pooler?Pooler有什么用?怎么用Pooler?当一系例的Pooler问题摆在你的面前时,你如何下手?从那儿下手?但这些又是我们不得不面对的待解决的问题;唯一的办法只能是理解、解决它些问题;
Pooler是池子的意思,可以这样理解,无论是那种Pooler,它都为用户提供了一个相当于容器的作用的连接,比如一个用户可能需要连接远程数据库中特定的表,当他花费了一定时间进行连接、使用完毕之后,他是要释放这个连接呢?还是将这个连接放在某一个容器里边呢?不可否认,在多层中,连接操作是一个费时操作,特别是对于大型数据库,要进行各种检查等操作,那么,是不是每一个用户都需要这个步骤呢?这儿就提到了Pooler这个概念;当用户A将操作都执行完毕之后,他如果释放了他的连接,那么这个连接就只是被用户A一个人用过,其它用户没有机会再次用这个连接,还要继续进行连接操作,继续的浪费时间;而如果用户A能将这个连接保存下来,供以后可以用的着这个连接的用户使用的话,岂不是很好吗?此是Pooler就充当了这个角色,所以,我们可以将Pooler在这儿理解成存放连接的池子或容器;
Pooler又可以分为那些?分类所然没有被完全的规格化,但是可以将它罗列出来对我们有用的一些,比如: Session Pooling,Object Pooling,Connection Pooling(前边我们说过),ResourcePooling,DataPooling等,每种不同的Pooling有不同的作用,但它们实现的功能大至相同,它们可以提供方便的连接以节省用户时间,而且可以用我们曾说过的COM or DCOM的TimeOut来判断用户是否超时等;
只要有三层的地方,都应该用Pooling,甚至在C/S里也提倡用Pooling,Connection Pooling就是最好的例子;当然,需要合理的应用Pooling,不然会造成接口混乱等现象;
Pooler 有什么作用?在说什么是Pooler时,我们已经可以感觉的出来它的作用了;那么如何用Pooler呢?提到如何用Pooler时,我想大家如果有时间应该看一看我关边的关于COM DCOM方面的介绍,因为用Pooler 或者叫pooling就必须要用到接口,而前边我曾对接口给过很详细的说明,如果你不能理解接口的话,你在这儿也将无存下手;用Pooling提高分布式系统的效率的确是一个直接的办法,特别是当用户增加时;不管是MTS or DCOM or CORBA都需要用到Pooling技术来使效率提高,我将实现原理介绍一下,后边我会对一个例子进行分析以便于我们更好的交流;一般而言,用Pooling都需要用到协调接口类CoClass(前边的贴子,我曾说过这个类);我们可以将它理解为一个类,一个实现接口的类,而Pooling就要用到它!当客户端需要数据时,那以应用程序服务器就会应用CoClass所实现的接口来和应用服务器本身的接口进行交互;而且所有连接、线程控制等操作都由你新建的CoClass所负责,之后才通过应用服务本身的接口进行数据的存取,步骤我理解是这样的:当然用户想建立某一个连接或是某一项费时操作时,而正好它现在没有和应用服务器进行通讯,这样的话,它就希望以最快的速度和应用服务器进行通讯,但步骤往往不是它直接和应用服务器进行通讯,而是它需要和CoClass的实现的接口进行作用,当这个辅助类(CoClass)给这个用户分配了线程、建立了连接等一些资源时,它才可以和应用服务进行通讯,你会不会觉的这是多此一举呢?不,应用这个辅助类可以检测是否有空闲的连接来让这个用户进行连接,是否有可用的其它资源让这个用户进行操作,这样才会正真的节省客户的时间,同时也大大的降底了服务器的担子;这便是它的作用;
而至于如何建立进程、如果分配资源,又如何回收,下边有例子可供参考;
我们来用一个例子来说;(你可以参考Delphi/Demo/Midas/Pooler这个例子 );
//****************************************************************************//
// Pool Code(池子代码) //
// //
// 作 者: 小小 //
// 调试时间:2002-10-31 15:41 //
// 调试环境:Windows 2000 + Delphi6.0; //
// 参考资料:暂无 //
// 交流地点:www.nxit.net/bbs or www.nxit.net/bbs //
//****************************************************************************//
服务端窗体///只是为了让操作者更明确一些;
unit srvrfrm;
interface
uses
Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs;
type
TForm1 = class(TForm)
private
{ Private declarations }
public
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
end.
这不用解释吧,呵呵;只是例出来更有利用各位的调试
远程数据模块:
//****************************************************************************//
// Pool Code(池子代码) //
// //
// 作 者: 小小 //
// 调试时间:2002-10-31 15:41 //
// 调试环境:Windows 2000 + Delphi6.0; //
// 参考资料:暂无 //
// 交流地点:www.nxit.net/bbs or www.nxit.net/bbs //
//****************************************************************************//
unit srvrdm;
 
interface
uses
Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
ComServ, ComObj, VCLCom, StdVcl, ActiveX, DataBkr, Server_TLB,
Db, DBTables, Provider, Variants;
type
TPooledRDM = class(TRemoteDataModule, IPooledRDM)
Session1: TSession;
Database1: TDatabase;
Query1: TQuery;
DataSetProvider1: TDataSetProvider;
procedure DataSetProvider1BeforeGetRecords(Sender: TObject;
var OwnerData: OleVariant);
protected
class procedure UpdateRegistry(Register: Boolean; const ClassID, ProgID: string); override;
private
{ Private declarations }
public
{ Public declarations }
end;
var
{ Need a reference to the ClassFactory so the pooler can create instances of the
class. }
RDMFactory: TComponentFactory;
/////这儿定义了一个类工厂,如果你对它不明白,请看一看COM DCOM里边有;
////我觉的这个单元文件没有什么可解释的;
implementation
{$R *.dfm}
procedure TPooledRDM.DataSetProvider1BeforeGetRecords(Sender: TObject;
var OwnerData: OleVariant);
begin
try
Query1.Close;
Query1.SQL.Text := OwnerData[0];
if not VarIsNull(OwnerData[1]) and not VarIsEmpty(OwnerData[1]) then
begin
Query1.Open;
if not Query1.Locate(Query1.Fields[0].FieldName, OwnerData[1], []) then
raise Exception.Create('Record not found');
Query1.Next;
end;
finally
OwnerData := NULL;
end;
end;
class procedure TPooledRDM.UpdateRegistry(Register: Boolean; const ClassID, ProgID: string);
begin
if Register then
begin
inherited UpdateRegistry(Register, ClassID, ProgID);
EnableSocketTransport(ClassID);
EnableWebTransport(ClassID);
end else
begin
DisableSocketTransport(ClassID);
DisableWebTransport(ClassID);
inherited UpdateRegistry(Register, ClassID, ProgID);
end;
end;
 
initialization
RDMFactory := TComponentFactory.Create(ComServer, TPooledRDM,
Class_PooledRDM, ciInternal, tmApartment);///////////希望你可以明白这些代码
end.
 
//****************************************************************************//
// Pool Code(池子代码) //
// //
// 作 者: 小小 //
// 调试时间:2002-10-31 15:41 //
// 调试环境:Windows 2000 + Delphi6.0; //
// 参考资料:暂无 //
// 交流地点:www.nxit.net/bbs or www.nxit.net/bbs //
//****************************************************************************//
unit Server_TLB;
// ************************************************************************ //
// 如些之类的代码在回调机制中,我详细的说明过,这儿,我不准备再说;你们
// 可以自己分析
************************************************************************ //
 
{$TYPEDADDRESS OFF} // Unit must be compiled without type-checked pointers.
{$WARN SYMBOL_PLATFORM OFF}
{$WRITEABLECONST ON}
interface
uses ActiveX, Classes, Graphics, Midas, StdVCL, Variants, Windows;
 
 
 
const
// TypeLibrary Major and minor versions
ServerMajorVersion = 1;
ServerMinorVersion = 0;
LIBID_Server: TGUID = '{0CE99800-9F28-11D1-8944-00A0248E5091}';
IID_IPooledRDM: TGUID = '{0CE99801-9F28-11D1-8944-00A0248E5091}';
CLASS_PooledRDM: TGUID = '{0CE99802-9F28-11D1-8944-00A0248E5091}';
CLASS_Pooler: TGUID = '{0CE99804-9F28-11D1-8944-00A0248E5091}';
type
 
IPooledRDM = interface;
IPooledRDMDisp = dispinterface;
PooledRDM = IPooledRDM;
Pooler = IPooledRDM;
 
IPooledRDM = interface(IAppServer)
['{0CE99801-9F28-11D1-8944-00A0248E5091}']
end;
IPooledRDMDisp = dispinterface
['{0CE99801-9F28-11D1-8944-00A0248E5091}']
function AS_ApplyUpdates(const ProviderName: WideString; Delta: OleVariant;
MaxErrors: Integer; out ErrorCount: Integer; var OwnerData: OleVariant): OleVariant; dispid 20000000;
function AS_GetRecords(const ProviderName: WideString; Count: Integer; out RecsOut: Integer;
Options: Integer; const CommandText: WideString;
var Params: OleVariant; var OwnerData: OleVariant): OleVariant; dispid 20000001;
function AS_DataRequest(const ProviderName: WideString; Data: OleVariant): OleVariant; dispid 20000002;
function AS_GetProviderNames: OleVariant; dispid 20000003;
function AS_GetParams(const ProviderName: WideString; var OwnerData: OleVariant): OleVariant; dispid 20000004;
function AS_RowRequest(const ProviderName: WideString; Row: OleVariant; RequestType: Integer;
var OwnerData: OleVariant): OleVariant; dispid 20000005;
procedure AS_Execute(const ProviderName: WideString; const CommandText: WideString;
var Params: OleVariant; var OwnerData: OleVariant); dispid 20000006;
end;
CoPooledRDM = class
class function Create: IPooledRDM;
class function CreateRemote(const MachineName: string): IPooledRDM;
end;
CoPooler = class
class function Create: IPooledRDM;
class function CreateRemote(const MachineName: string): IPooledRDM;
end;
implementation
uses ComObj;
class function CoPooledRDM.Create: IPooledRDM;
begin
Result := CreateComObject(CLASS_PooledRDM) as IPooledRDM;
end;
class function CoPooledRDM.CreateRemote(const MachineName: string): IPooledRDM;
begin
Result := CreateRemoteComObject(MachineName, CLASS_PooledRDM) as IPooledRDM;
end;
class function CoPooler.Create: IPooledRDM;
begin
Result := CreateComObject(CLASS_Pooler) as IPooledRDM;
end;
class function CoPooler.CreateRemote(const MachineName: string): IPooledRDM;
begin
Result := CreateRemoteComObject(MachineName, CLASS_Pooler) as IPooledRDM;
end;
end.
 
//****************************************************************************//
// Pool Code(池子代码) //
// //
// 作 者: 小小 //
// 调试时间:2002-10-31 15:41 //
// 调试环境:Windows 2000 + Delphi6.0; //
// 参考资料:暂无 //
// 交流地点:www.nxit.net/bbs or www.nxit.net/bbs //
//****************************************************************************//
unit ServerS_Unit;
interface
uses
ComObj, ActiveX, ServerS_TLB, Classes, SyncObjs, Windows;/////单元引用;
type
TPoolS = Class(TAutoObject,IPooler_LX)//接口的实现,前边说COM and DCOM曾说过;
Private
Function GetInterface : IPooler_LX; //获得一个接口,进而获得对像;
Procedure SetInterface(InterfaceValue : IPooler_LX);//释放一个接口,是放回去;
Function GetLock(Index : Integer) : Boolean;
Procedure ReleaseLock(Index : Integer;Var InterfaceValue : IPooler_LX);
Protected
{ IAppServer }
{**************************************************************************}
{在Protected里边,所有的声明都是基于IppAerver接口的,如计数加1,获得接口, }
{ 释放接口操作,而且还引用了协调接口类 }
{**************************************************************************}
function AS_ApplyUpdates(const ProviderName: WideString; Delta: OleVariant;
MaxErrors: Integer; out ErrorCount: Integer; var OwnerData: OleVariant): OleVariant; safecall;
function AS_GetRecords(const ProviderName: WideString; Count: Integer; out RecsOut: Integer;
Options: Integer; const CommandText: WideString;
var Params: OleVariant; var OwnerData: OleVariant): OleVariant; safecall;
function AS_DataRequest(const ProviderName: WideString; Data: OleVariant): OleVariant; safecall;
function AS_GetProviderNames: OleVariant; safecall;
function AS_GetParams(const ProviderName: WideString; var OwnerData: OleVariant): OleVariant; safecall;
function AS_RowRequest(const ProviderName: WideString; Row: OleVariant; RequestType: Integer;
var OwnerData: OleVariant): OleVariant; safecall;
procedure AS_Execute(const ProviderName: WideString; const CommandText: WideString;
var Params: OleVariant; var OwnerData: OleVariant); safecall;
end;
TPoolManager = Class(TObject)///定义一个管理接口对像类,必不可少,不然无法管理
Private
FMaxCount : Integer;////最大连接数;
FTimeOut : Integer;////连接超时;
FCriticalSectionS : TCriticalSection;////可以理解为进行协调;
FHandle : THandle
FRDMList:TList;
/////////////Function GetLock(Index : Integer) : Boolean;
///////////////////Procedure RePlease(Index : Integer InterfaceValue : IPooler_LX);
Function CreateNewInstance : IPooler_LX;
Public
Constructor Create;
Destructor Destroy;Override;
Function GetInterface : IPooler_LX; //获得一个接口,进而获得对像;
Procedure SetInterface(InterfaceValue : IPooler_LX);//释放一个接口,是放回去; s
Property TimeOut : Integer read FTimeOut;
Property MaxCount : Integer read FMaxCOunt;
end;
PRDM = ^TRDM;
TRDM = Record
InValue : IPooler_LX;
InUser : Boolean;
end;
var
PoolManager : TPoolManager;
 
implementation
uses
ComServ, SysUtils , SRDMUnit;
{ TPoolManager }
constructor TPoolManager.Create;
begin
FRDMList := TList.Create;
FCriticalSectionS := TCriticalSection.Create;
FTimeOut := 1000;
FMaxCount := 5;
FHandle := CreateSemaphore(Nil,FTimeOut,FMaxCount,Nil);
end;
{******************************************************************************}
{ CriticalSection是指一段程序码,同一时间只能有一个Thread执行它,此VCL包装了 }
{ Win32API中关於CriticalSection的4个函数InitializeCriticalSection、 }
{ EnterCriticalSection、LeaveCriticalSection及DeleteCriticalSection) }
{ }
{ Win 32 API提供了一组同步对象,如:信号灯(Semaphore)、互斥(Mutex)、 }
{ 临界区(CriticalSection)和事件(Event)等,用来解决这个问题 }
{******************************************************************************}
{******************************************************************************}
{ TPoolManager.Create是为远程数据模快创建一个缓冲池,并且进行了规定,禄始化 }
{                                  }
{                                    }
{ }
{                                    }
{                                }
{******************************************************************************}
function TPoolManager.CreateNewInstance: IPooler_LX;
var
PValue : PRDM;
begin
FCriticalSectionS.Enter;
Try
New(PValue);
PValue.InValue := RDMFactory.CreateComObject(Nil) as IPooler_LX;//类工厂的应用
PValue.InUser := True;//置为可用;
FRDMList.Add(PValue);
Result := PValue.InValue;
Finally
FCriticalSectionS.Leave;
end;
end;
destructor TPoolManager.Destroy;
var
I : Integer;
begin
FCriticalSectionS.Free;
for I := 0 to FRDMList.Count - 1 do
begin
PRDM(FRDMList).InValue := Nil;
//////////PRDM(FRDMList).InUser := False;
FreeMem(PRDM(FRDMList)); //////释放了PRDM资源;
end;
FRDMList.Free;
CloseHandle(FHandle);
inherited Destroy;
end;
function TPoolManager.GetInterface: IPooler_LX; //// 获得连接;
var
I : Integer;
begin
Result := False;
if WaitForSingleObject(FHandle,TimeOut) = WAIT_FAILED then
Raise Exception.Create('对不起,没有成功获得资源');
for I := 0 to FRDMList.Count - 1 do
begin
if GetLock(I) then
begin
Result := PRDM(FRDMList).InValue;///判断是否有可用的资源可以利用,或接口
Exit;
end;
end;
if FRDMList.Count < MaxCount then
Resule := CreateNewInstance;///新建资源
if Result = Nil then
Raise Exception.Create('对不起,不能获得资源');
end;
{******************************************************************************}
{ WaitForSingleObject()说明: }
{ WaitForSingleObject(ByVal hHandle As Long, }
{ ByVal dwMilliseconds As Long); }
{ hHandle:等待对象的句柄; }
{ dwMilliseconds:等待时间。 }
{ 该函数可以实现对一个可等待对象的等待操作,获取操作执行权。当等待的对象被释放时}
{ 函数成功返回,同时使等待对象变为有信号状态,或者超时返回。该函数用于等待 }
{ Semaphore信号时,若Semaphore信号不为0,则函数成功返回,同时使Semaphore信号记 }
{ 数减1。我们可以利用它建立一个记数信号,每次当相同的程序加载时使记数器减1,退 }
{ 出的时候使计数器加1,这样就可以限制相同程序的多份拷贝同时运行。 }
{******************************************************************************}
function TPoolManager.GetLock(Index: Integer): Boolean;
begin
FCriticalSectionS.Enter;///进锁
Try
Result := Not PRDM(FRDMList[Index]).InUser;//是否可用,是否可以获得连接;
if Not Result then
PRDM(FRDMList[Index]).InUser := True;//置连接为不在可用(对于其它用户而言)
Finally
FCriticalSectionS.Leave;
end;
end;
procedure TPoolManager.RePlease(Index: Integer;
InterfaceValue: IPooler_LX);
begin
FCritiCalSectionS.Enter;
Try
PRDM(FRDMList[Index]).InUser := False;
InterfaceValue := Nil;
ReleaseSemaphore(FHandle,1,Nil);
Finally
FCritiCalSectionS.Leave;
end;
end;
procedure TPoolManager.SetInterface(InterfaceValue: IPooler_LX);
var
I : Integer;
begin
for I := 0 to FRDMList.Count do
begin
if InterfaceValue = PRDM(FRDM).InValue then
RePlease(I,InterfaceValue);
Break;
end;
end;
{ TPoolS }
function TPoolS.AS_ApplyUpdates(const ProviderName: WideString;
Delta: OleVariant; MaxErrors: Integer; out ErrorCount: Integer;
var OwnerData: OleVariant): OleVariant;
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
function TPoolS.AS_DataRequest(const ProviderName: WideString;
Data: OleVariant): OleVariant;
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
procedure TPoolS.AS_Execute(const ProviderName, CommandText: WideString;
var Params, OwnerData: OleVariant);
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
function TPoolS.AS_GetParams(const ProviderName: WideString;
var OwnerData: OleVariant): OleVariant;
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
function TPoolS.AS_GetProviderNames: OleVariant;
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
function TPoolS.AS_GetRecords(const ProviderName: WideString;
Count: Integer; out RecsOut: Integer; Options: Integer;
const CommandText: WideString; var Params,
OwnerData: OleVariant): OleVariant;
begin
....//书上有详细的说明,每本多层系统的书上都有;
end;
function TPoolS.AS_RowRequest(const ProviderName: WideString;
Row: OleVariant; RequestType: Integer;
var OwnerData: OleVariant): OleVariant;
begin
end;
function TPoolS.GetInterface: IPooler_LX;
begin
Result := PoolManager.GetInterface;/////获得接口
end;
procedure TPoolS.SetInterface(InterfaceValue: IPooler_LX);
begin
PoolManager.SetInterface(InterfaceValue);////这是当释放一个接口是用到的代码;
end;
end.
希望对大家有用,但是需要说明一点的是,你应该理解锁定、线程、类工厂等内容,不然,理解这些,有些勉强;
小小希望你是最棒的一个;
 
后退
顶部