1. WebSockets support
By definition, HTTP connections are stateless and one-way, i.e. a client
sends a request to the server, which replies back with an answer.
There is no way to let the server send a message to the client, without a prior
request from the client side.
WebSockets is a communication protocol which is able to
upgrade a regular HTTP connection into a dual-way communication
wire.
After a safe handshake, the underlying TCP/IP socket is able to be accessed
directly, via a set of lightweight frames over an application-defined
protocol, without the HTTP overhead.
The SynBidirSock.pas
unit implements low-level server and client WebSockets
communication.
The TWebSocketProtocol
class defines an abstract WebSockets
protocol, currently implemented as several classes.
For our
Client-Server services via interfaces, we would still need to make
RESTful requests, so the basic WebSockets framing has been
enhanced to support TWebSocketProtocolRest
REST-compatible
protocols, able to use the single connection for both REST queries and
asynchronous notifications.
Two classes are available for your SOA applications:
TWebSocketProtocolJSON
as a "pure" JSON light protocol;TWebSocketProtocolBinary
as a binary proprietary protocol, with optional frame compression and AES encryption (using AES-NI hardware instructions, if available).
In practice, on the server side, you would start your
TSQLHttpServer
by specifying useBidirSocket
as kind
of server:
HttpServer := TSQLHttpServer.Create('8888',[Server],'+',useBidirSocket);
Under the hood, it will instantiate a TWebSocketServer
HTTP
server, as defined in mORMotHttpServer.pas
, based on the sockets
API, able to upgrade the HTTP protocol into WebSockets.
Our High-performance http.sys server is not yet able to switch to
WebSockets - and at API level, it would require at least Windows
8 or Windows 2012 Server.
Then you enable WebSockets for the
TWebSocketProtocolBinary
protocol, with an encryption key:
HttpServer.WebSocketsEnable(Server,'encryptionkey');
On the client side, you would use a TSQLHttpClientWebsockets
instance, as defined in mORMotHttpClient.pas
, then explicitly
upgrade the connection to use WebSockets (since by default, it will
stick to the HTTP protocol):
Client := TSQLHttpClientWebsockets.Create('127.0.0.1','8888',TSQLModel.Create([])); Client.WebSocketsUpgrade('encryptionkey');
The expected protocol detail should match the one on the server, i.e.
'encryptionkey'
encryption over our binary protocol.
Once upgraded to WebSockets, you may use regular REST commands, as usual:
Client.ServerTimeStampSynchronize;
But in addition to regular query/answer commands as defined for
Client-Server services via interfaces, you would be able to define
callbacks using interface
parameters to the service methods.
Under the hood, both client and server will communicate using
WebSockets frames, maintaining the connection active using heartbeats
(via ping/pong frames), and with clean connection shutdown, from any side. You
can use the Settings
property of the
TWebSocketServerRest
instance, as returned by
TSQLHttpServer.WebSocketsEnable()
, to customize the low-level
WebSockets protocol (e.g. timeouts or heartbeats) on the server side.
The TSQLHttpClientWebsockets.WebSockets
.Settings
property would allow the same, on the client side.
We have observed, from our regression tests and internal benchmarking, that using our WebSockets may be faster than regular HTTP, since its frames would be sent as once, whereas HTTP headers and body are not sent in the same TCP packet, and compression would be available for the whole frame, whereas HTTP headers are not compressed. The ability to use strong AES encryption would make this mean of communication even safer than plain HTTP, even with AES encryption over HTTP.
2. Using a callback to notify long term end-of-process
An example is better than 100 talks.
So let's take a look at the Project31LongWorkServer.dpr
and
Project31LongWorkClient.dpr
samples, from the
SQLite3\Samples\31 - WebSockets
sub-folder.
They will implement a client/server application, in which the client launches a
long term process on the server side, then is notified when the process is
done, either with success, or failure.
First we define the interfaces to be used, in a shared
Project31LongWorkCallbackInterface.pas
unit:
type ILongWorkCallback = interface(IInvokable) ['{425BF199-19C7-4B2B-B1A4-A5BE7A9A4748}'] procedure WorkFinished(const workName: string; timeTaken: integer); procedure WorkFailed(const workName, error: string); end;
ILongWorkService = interface(IInvokable) ['{09FDFCEF-86E5-4077-80D8-661801A9224A}'] procedure StartWork(const workName: string; const onFinish: ILongWorkCallback); function TotalWorkCount: Integer; end;
The only specific definition is the const onFinish:
ILongWorkCallback
parameter, supplied to the
ILongWorkService.StartWork()
method.
The client will create a class implementing ILongWorkCallback
,
then specify it as parameter to this method.
On the server side, a "fake" class will implement
ILongWorkCallback
, then will call back the client using the very
same WebSockets connection, when any of its methods will be
executed.
As you can see, a single callback interface
instance may have
several methods, with their own set of parameters (here
WorkFinished
and WorkFailed
), so that the callback
may be quite expressive.
Any kind of usual parameters would be transmitted, after serialization:
string
, integer
, but even record
,
dynamic arrays, TSQLRecord
or TPersistent
values.
When the ILongWorkCallback
instance will be released on the
client side, the server will be notified, so that any further notification
won't create a connection error.
We will see later how to handle those events.
Client service consumption
The client may be connected to the server as such (see the
Project31LongWorkClient.dpr
sample source code for the full
details, including error handling):
var Client: TSQLHttpClientWebsockets;
workName: string;
Service: ILongWorkService;
callback: ILongWorkCallback;
begin
Client := TSQLHttpClientWebsockets.Create('127.0.0.1','8888',TSQLModel.Create([]));
Client.WebSocketsUpgrade(PROJECT31_TRANSMISSION_KEY);
Client.ServiceDefine([ILongWorkService],sicShared);
Client.Services.Resolve(ILongWorkService,Service);
Then we define our callback, using a dedicated class:
type TLongWorkCallback = class(TInterfacedCallback,ILongWorkCallback) protected procedure WorkFinished(const workName: string; timeTaken: integer); procedure WorkFailed(const workName, error: string); end;
procedure TLongWorkCallback.WorkFailed(const workName, error: string); begin writeln(#13'Received callback WorkFailed(',workName,') with message "',error,'"'); end;
procedure TLongWorkCallback.WorkFinished(const workName: string; timeTaken: integer); begin writeln(#13'Received callback WorkFinished(',workName,') in ',timeTaken,'ms'); end;
Then we specify this kind of callback as parameter to start a long term work:
callback := TLongWorkCallback.Create(Client,ILongWorkCallback); try repeat readln(workName); if workName='' then break; Service.StartWork(workName,callback); until false; finally callback := nil; // the server will be notified and release its "fake" class Service := nil; // release the service local instance BEFORE Client.Free end;
As you can see, the client is able to start one or several work processes,
then expects to be notified of the process ending on its callback
interface
instance, without explicitly polling the server for its
state, since the connection was upgraded to WebSockets via a call to
TSQLHttpClientWebsockets.WebSocketsUpgrade()
.
Server side implementation
The server would define the working thread as such (see the
Project31LongWorkServer.dpr
sample source code for the full
details):
type TLongWorkServiceThread = class(TThread) protected fCallback: ILongWorkCallback; fWorkName: string; procedure Execute; override; public constructor Create(const workName: string; const callback: ILongWorkCallback); end;
constructor TLongWorkServiceThread.Create(const workName: string; const callback: ILongWorkCallback); begin inherited Create(false); fCallback := Callback; fWorkName := workName; FreeOnTerminate := true; end;
procedure TLongWorkServiceThread.Execute; var tix: Int64; begin tix := GetTickCount64; Sleep(5000+Random(1000)); // some hard work if Random(100)>20 then fCallback.WorkFinished(fWorkName,GetTickCount64-tix) else fCallback.WorkFailed(fWorkName,'expected random failure'); end;
The callback is expected to be supplied as a ILongWorkCallback
interface instance, then stored in a fCallback
protected field for
further notification.
Some work is done in the TLongWorkServiceThread.Execute
method
(here just a Sleep()
of more than 5 seconds), and the end-of-work
notification is processed, as success or failure (depending on random in this
fake process class), on either of the ILongWorkCallback
interface
methods.
The following class
will define, implement and register the
ILongWorkService
service on the server side:
type TLongWorkService = class(TInterfacedObject,ILongWorkService) protected fTotalWorkCount: Integer; public procedure StartWork(const workName: string; const onFinish: ILongWorkCallback); function TotalWorkCount: Integer; end;
procedure TLongWorkService.StartWork(const workName: string; const onFinish: ILongWorkCallback); begin InterlockedIncrement(fTotalWorkCount); TLongWorkServiceThread.Create(workName,onFinish); end;
function TLongWorkService.TotalWorkCount: Integer; begin result := fTotalWorkCount; end;
var HttpServer: TSQLHttpServer; Server: TSQLRestServerFullMemory; begin Server := TSQLRestServerFullMemory.CreateWithOwnModel([]); Server.ServiceDefine(TLongWorkService,[ILongWorkService],sicShared); HttpServer := TSQLHttpServer.Create('8888',[Server],'+',useBidirSocket); HttpServer.WebSocketsEnable(Server,PROJECT31_TRANSMISSION_KEY); ...
Purpose of those methods is just to create and launch the
TLongWorkServiceThread
process from a client request, then
maintain a total count of started works, in a sicShared
service
instance - see
Instances life time implementation - hosted in a
useBidirSocket
kind of HTTP server.
We have to explicitly call TSQLHttpServer.WebSocketsEnable()
so
that this server would be able to upgrade to our WebSockets protocol,
using our binary framing, and the very same encryption key as on the client
side - shared as a PROJECT31_TRANSMISSION_KEY
constant in the
sample, but which may be safely stored on both sides.
3. Publish-subscribe for events
In event-driven architectures, the publish-subscribe messaging
pattern is a way of letting senders (called publishers) transmit
messages to their receivers (called subscribers), without any prior
knowledge of who those subscribers are.
In practice, the subscribers will express interest for a set of
messages, which will be sent by the publisher to all the
subscribers of a given message, as soon as it is be notified.
In our Client-Server services via interfaces implementation,
messages are gathered in interface
types, and each message defined
as a single method, their content being the methods parameters.
Most of the SOA alternative (in Java or C#) do require class definition for
messages. Our KISS approach will just use method parameters values as message
definition.
To maintain a list of subscribers, the easiest is to store a
dynamic array of interface
instances, on the
publisher side.
Defining the interfaces
We will now implement a simple chat service, able to let several
clients communicate together, broadcasting any message to all the other
connected instances.
This sample is also located in the the
SQLite3\Samples\31 - WebSockets
sub-folder, as
Project31ChatServer.dpr
and
Project31ChatClient.dpr
.
So you first define the callback interface, and the service interface:
type IChatCallback = interface(IInvokable) ['{EA7EFE51-3EBA-4047-A356-253374518D1D}'] procedure BlaBla(const pseudo, msg: string); end;
IChatService = interface(IInvokable) ['{C92DCBEA-C680-40BD-8D9C-3E6F2ED9C9CF}'] procedure Join(const pseudo: string; const callback: IChatCallback); procedure BlaBla(const pseudo,msg: string); procedure CallbackReleased(const callback: IInvokable); end;
Those interface types will be shared by both server and client sides, in the
common Project31ChatCallbackInterface.pas
unit. The definition is
pretty close to what we wrote just above to notify long term
end-of-process.
The only additional method is IChatServer.CallbackReleased()
,
which, by convention, will be called on the server side when any
callback
interface instance is released on the client side.
As such, the IChatService.Join()
method will implement the
subscription to the chat service, whereas
IChatServer.CallbackReleased()
will be called when the client-side
callback instance will be released (i.e. when its variable will be assigned to
nil
), to unsubscribe for the chat service.
Writing the Publisher
On the server side, each call to IChatService.Join()
would
subscribe to an internal list of connections, simply stored as an
array of IChatCallback
:
type TChatService = class(TInterfacedObject,IChatService) protected fConnected: array of IChatCallback; public procedure Join(const pseudo: string; const callback: IChatCallback); procedure BlaBla(const pseudo,msg: string); procedure CallbackReleased(const callback: IInvokable); end;
procedure TChatService.Join(const pseudo: string; const callback: IChatCallback); begin InterfaceArrayAdd(fConnected,callback); end;
The InterfaceArrayAdd()
function, as defined in
SynCommons.pas
, is a simple wrapper around any dynamic
array of interface
instances, so that you may use it, or the
associated InterfaceArrayFind()
or
InterfaceArrayDelete()
functions, to maintain the list of
subscriptions.
Then a remote call to the IChatService.BlaBla()
method should
be broadcasted to all connected clients, just by calling the
IChatCallback.BlaBla()
method:
procedure TChatService.BlaBla(const pseudo,msg: string);
var i: integer;
begin
for i := 0 to high(fConnected) do
fConnected[i].BlaBla(pseudo,msg);
end;
Note that every call to IChatCallback.BlaBla()
within the loop
would be made via WebSockets, in an asynchronous and non blocking way,
so that even in case of huge number of clients, the
IChatService.BlaBla()
method won't block.
In case of high numbers of messages, the framework is even able to
gather push notification messages into a single bigger message, to
reduce the resource use.
On the server side, the service implementation has been registered as such:
Server.ServiceDefine(TChatService,[IChatService],sicShared). SetOptions([],[optExecLockedPerInterface]);
Here, the optExecLockedPerInterface
option has been set, so
that all method calls would be made thread-safe, so that concurrent access to
the internal fConnected[]
list would be safe.
Since a global list of connections is to be maintained, the service life time
is defined as sicShared
- see
Instances life time implementation.
The following method will be called by the server, when a client callback
instance is released (either explicitly, or if the connection is broken), so
could be used to unsubscribe to the notification, simply by deleting
the callback from the internal fConnected[]
array:
procedure TChatService.CallbackReleased(const callback: IInvokable);
begin
InterfaceArrayDelete(fConnected,callback);
end;
The framework will in fact recognize the following method definition in any
interface
type for a service:
procedure CallbackReleased(const callback: IInvokable);
When a callback interface
parameter (in our case,
IChatCallback
) will be released on the client side, this method
will be called with the corresponding interface
instance as
parameter.
You do not have to call explicitly any method on the client side to
unsubscribe a service: assigning nil to a callback variable,
or feeing the class
instance owning it as a field on the
subscriber side, will automatically unregister it on the
publisher side.
Consuming the service from the Subscriber side
On the client side, you implement the IChatCallback
callback
interface:
type TChatCallback = class(TInterfacedCallback,IChatCallback) protected procedure BlaBla(const pseudo, msg: string); end;
procedure TChatCallback.BlaBla(const pseudo, msg: string); begin writeln(#13'@',pseudo,' ',msg); end;
Then you subscribe to your remote service as such:
var Service: IChatService; callback: IChatCallback; ... Client.ServiceDefine([IChatService],sicShared); if not Client.Services.Resolve(IChatService,Service) then raise EServiceException.Create('Service IChatService unavailable'); ... callback := TChatCallback.Create(Client,IChatCallback); Service.Join(pseudo,callback); ... try repeat readln(msg); if msg='' then break; Service.BlaBla(pseudo,msg); until false; finally callback := nil; // will unsubscribe from the remote publisher Service := nil; // release the service local instance BEFORE Client.Free end;
You could easily implement more complex publish/subscribe
mechanisms, including filtering, time to live or tuned broadcasting, by storing
some additional information to the interface
instance (e.g. some
value to filter, a timestamp). A dynamic array of dedicated
record
s, or a list of class
instances, may be used to
store the subscribers expectations.
If you compare with existing client/server SOA solutions (in Delphi, Java,
C# or even in Go or other frameworks), this interface
-based
callback mechanism sounds pretty unique and easy to work with.
In fact, this is a good way of implementing callbacks conforming to SOLID
design principles on the server side, and let the mORMot
framework publish this mechanism in a client/server way, by using
WebSockets. The very same code could be used on the server side, with
no transmission nor marshaling overhead (via direct interface
calls), and over a network, with optimized use of resource and bandwidth (via
"fake" interface
calls, and JSON marshalling over TCP/IP).
The ORM part of the framework uses this Publish-Subscribe feature to implement Real-Time master/slave replication.
Feedback is welcome on our forum, as usual.