1. WebSockets support

By definition, HTTP connections are stateless and one-way, i.e. a client sends a request to the server, which replies back with an answer.
There is no way to let the server send a message to the client, without a prior request from the client side.

WebSockets is a communication protocol which is able to upgrade a regular HTTP connection into a dual-way communication wire.
After a safe handshake, the underlying TCP/IP socket is able to be accessed directly, via a set of lightweight frames over an application-defined protocol, without the HTTP overhead.

The SynBidirSock.pas unit implements low-level server and client WebSockets communication.

The TWebSocketProtocol class defines an abstract WebSockets protocol, currently implemented as several classes.

For our Client-Server services via interfaces, we would still need to make RESTful requests, so the basic WebSockets framing has been enhanced to support TWebSocketProtocolRest REST-compatible protocols, able to use the single connection for both REST queries and asynchronous notifications.

Two classes are available for your SOA applications:

  • TWebSocketProtocolJSON as a "pure" JSON light protocol;
  • TWebSocketProtocolBinary as a binary proprietary protocol, with optional frame compression and AES encryption (using AES-NI hardware instructions, if available).

In practice, on the server side, you would start your TSQLHttpServer by specifying useBidirSocket as kind of server:

 HttpServer := TSQLHttpServer.Create('8888',[Server],'+',useBidirSocket);

Under the hood, it will instantiate a TWebSocketServer HTTP server, as defined in mORMotHttpServer.pas, based on the sockets API, able to upgrade the HTTP protocol into WebSockets.
Our High-performance http.sys server is not yet able to switch to WebSockets - and at API level, it would require at least Windows 8 or Windows 2012 Server.

Then you enable WebSockets for the TWebSocketProtocolBinary protocol, with an encryption key:

   HttpServer.WebSocketsEnable(Server,'encryptionkey');

On the client side, you would use a TSQLHttpClientWebsockets instance, as defined in mORMotHttpClient.pas, then explicitly upgrade the connection to use WebSockets (since by default, it will stick to the HTTP protocol):

  Client := TSQLHttpClientWebsockets.Create('127.0.0.1','8888',TSQLModel.Create([]));
  Client.WebSocketsUpgrade('encryptionkey');

The expected protocol detail should match the one on the server, i.e. 'encryptionkey' encryption over our binary protocol.

Once upgraded to WebSockets, you may use regular REST commands, as usual:

  Client.ServerTimeStampSynchronize;

But in addition to regular query/answer commands as defined for Client-Server services via interfaces, you would be able to define callbacks using interface parameters to the service methods.

Under the hood, both client and server will communicate using WebSockets frames, maintaining the connection active using heartbeats (via ping/pong frames), and with clean connection shutdown, from any side. You can use the Settings property of the TWebSocketServerRest instance, as returned by TSQLHttpServer.WebSocketsEnable(), to customize the low-level WebSockets protocol (e.g. timeouts or heartbeats) on the server side. The TSQLHttpClientWebsockets.WebSockets.Settings property would allow the same, on the client side.

We have observed, from our regression tests and internal benchmarking, that using our WebSockets may be faster than regular HTTP, since its frames would be sent as once, whereas HTTP headers and body are not sent in the same TCP packet, and compression would be available for the whole frame, whereas HTTP headers are not compressed. The ability to use strong AES encryption would make this mean of communication even safer than plain HTTP, even with AES encryption over HTTP.

2. Using a callback to notify long term end-of-process

An example is better than 100 talks.
So let's take a look at the Project31LongWorkServer.dpr and Project31LongWorkClient.dpr samples, from the SQLite3\Samples\31 - WebSockets sub-folder.
They will implement a client/server application, in which the client launches a long term process on the server side, then is notified when the process is done, either with success, or failure.

First we define the interfaces to be used, in a shared Project31LongWorkCallbackInterface.pas unit:

type
  ILongWorkCallback = interface(IInvokable)
    ['{425BF199-19C7-4B2B-B1A4-A5BE7A9A4748}']
    procedure WorkFinished(const workName: string; timeTaken: integer);
    procedure WorkFailed(const workName, error: string);
  end;
ILongWorkService = interface(IInvokable) ['{09FDFCEF-86E5-4077-80D8-661801A9224A}'] procedure StartWork(const workName: string; const onFinish: ILongWorkCallback); function TotalWorkCount: Integer; end;

The only specific definition is the const onFinish: ILongWorkCallback parameter, supplied to the ILongWorkService.StartWork() method.
The client will create a class implementing ILongWorkCallback, then specify it as parameter to this method.
On the server side, a "fake" class will implement ILongWorkCallback, then will call back the client using the very same WebSockets connection, when any of its methods will be executed.

As you can see, a single callback interface instance may have several methods, with their own set of parameters (here WorkFinished and WorkFailed), so that the callback may be quite expressive.
Any kind of usual parameters would be transmitted, after serialization: string, integer, but even record, dynamic arrays, TSQLRecord or TPersistent values.

When the ILongWorkCallback instance will be released on the client side, the server will be notified, so that any further notification won't create a connection error.
We will see later how to handle those events.

Client service consumption

The client may be connected to the server as such (see the Project31LongWorkClient.dpr sample source code for the full details, including error handling):

var Client: TSQLHttpClientWebsockets;
    workName: string;
    Service: ILongWorkService;
    callback: ILongWorkCallback;
begin
  Client := TSQLHttpClientWebsockets.Create('127.0.0.1','8888',TSQLModel.Create([]));
  Client.WebSocketsUpgrade(PROJECT31_TRANSMISSION_KEY);
  Client.ServiceDefine([ILongWorkService],sicShared);
  Client.Services.Resolve(ILongWorkService,Service);

Then we define our callback, using a dedicated class:

type
  TLongWorkCallback = class(TInterfacedCallback,ILongWorkCallback)
  protected
    procedure WorkFinished(const workName: string; timeTaken: integer);
    procedure WorkFailed(const workName, error: string);
  end;
procedure TLongWorkCallback.WorkFailed(const workName, error: string); begin writeln(#13'Received callback WorkFailed(',workName,') with message "',error,'"'); end;
procedure TLongWorkCallback.WorkFinished(const workName: string; timeTaken: integer); begin writeln(#13'Received callback WorkFinished(',workName,') in ',timeTaken,'ms'); end;

Then we specify this kind of callback as parameter to start a long term work:

    callback := TLongWorkCallback.Create(Client,ILongWorkCallback);
    try
      repeat
        readln(workName);
        if workName='' then
          break;
        Service.StartWork(workName,callback);
      until false;
    finally
      callback := nil; // the server will be notified and release its "fake" class
      Service := nil;  // release the service local instance BEFORE Client.Free
    end;

As you can see, the client is able to start one or several work processes, then expects to be notified of the process ending on its callback interface instance, without explicitly polling the server for its state, since the connection was upgraded to WebSockets via a call to TSQLHttpClientWebsockets.WebSocketsUpgrade().

Server side implementation

The server would define the working thread as such (see the Project31LongWorkServer.dpr sample source code for the full details):

type
  TLongWorkServiceThread = class(TThread)
  protected
    fCallback: ILongWorkCallback;
    fWorkName: string;
    procedure Execute; override;
  public
    constructor Create(const workName: string; const callback: ILongWorkCallback);
  end;

constructor TLongWorkServiceThread.Create(const workName: string; const callback: ILongWorkCallback); begin inherited Create(false); fCallback := Callback; fWorkName := workName; FreeOnTerminate := true; end;
procedure TLongWorkServiceThread.Execute; var tix: Int64; begin tix := GetTickCount64; Sleep(5000+Random(1000)); // some hard work if Random(100)>20 then fCallback.WorkFinished(fWorkName,GetTickCount64-tix) else fCallback.WorkFailed(fWorkName,'expected random failure'); end;

The callback is expected to be supplied as a ILongWorkCallback interface instance, then stored in a fCallback protected field for further notification.
Some work is done in the TLongWorkServiceThread.Execute method (here just a Sleep() of more than 5 seconds), and the end-of-work notification is processed, as success or failure (depending on random in this fake process class), on either of the ILongWorkCallback interface methods.

The following class will define, implement and register the ILongWorkService service on the server side:

type
  TLongWorkService = class(TInterfacedObject,ILongWorkService)
  protected
    fTotalWorkCount: Integer;
  public
    procedure StartWork(const workName: string; const onFinish: ILongWorkCallback);
    function TotalWorkCount: Integer;
  end;

procedure TLongWorkService.StartWork(const workName: string; const onFinish: ILongWorkCallback); begin InterlockedIncrement(fTotalWorkCount); TLongWorkServiceThread.Create(workName,onFinish); end;
function TLongWorkService.TotalWorkCount: Integer; begin result := fTotalWorkCount; end;
var HttpServer: TSQLHttpServer; Server: TSQLRestServerFullMemory; begin Server := TSQLRestServerFullMemory.CreateWithOwnModel([]); Server.ServiceDefine(TLongWorkService,[ILongWorkService],sicShared); HttpServer := TSQLHttpServer.Create('8888',[Server],'+',useBidirSocket); HttpServer.WebSocketsEnable(Server,PROJECT31_TRANSMISSION_KEY); ...

Purpose of those methods is just to create and launch the TLongWorkServiceThread process from a client request, then maintain a total count of started works, in a sicShared service instance - see Instances life time implementation - hosted in a useBidirSocket kind of HTTP server.

We have to explicitly call TSQLHttpServer.WebSocketsEnable() so that this server would be able to upgrade to our WebSockets protocol, using our binary framing, and the very same encryption key as on the client side - shared as a PROJECT31_TRANSMISSION_KEY constant in the sample, but which may be safely stored on both sides.

3. Publish-subscribe for events

In event-driven architectures, the publish-subscribe messaging pattern is a way of letting senders (called publishers) transmit messages to their receivers (called subscribers), without any prior knowledge of who those subscribers are.
In practice, the subscribers will express interest for a set of messages, which will be sent by the publisher to all the subscribers of a given message, as soon as it is be notified.

In our Client-Server services via interfaces implementation, messages are gathered in interface types, and each message defined as a single method, their content being the methods parameters.
Most of the SOA alternative (in Java or C#) do require class definition for messages. Our KISS approach will just use method parameters values as message definition.

To maintain a list of subscribers, the easiest is to store a dynamic array of interface instances, on the publisher side.

Defining the interfaces

We will now implement a simple chat service, able to let several clients communicate together, broadcasting any message to all the other connected instances.
This sample is also located in the the SQLite3\Samples\31 - WebSockets sub-folder, as Project31ChatServer.dpr and Project31ChatClient.dpr.

So you first define the callback interface, and the service interface:

type
  IChatCallback = interface(IInvokable)
    ['{EA7EFE51-3EBA-4047-A356-253374518D1D}']
    procedure BlaBla(const pseudo, msg: string);
  end;

IChatService = interface(IInvokable) ['{C92DCBEA-C680-40BD-8D9C-3E6F2ED9C9CF}'] procedure Join(const pseudo: string; const callback: IChatCallback); procedure BlaBla(const pseudo,msg: string); procedure CallbackReleased(const callback: IInvokable); end;

Those interface types will be shared by both server and client sides, in the common Project31ChatCallbackInterface.pas unit. The definition is pretty close to what we wrote just above to notify long term end-of-process.
The only additional method is IChatServer.CallbackReleased(), which, by convention, will be called on the server side when any callback interface instance is released on the client side.

As such, the IChatService.Join() method will implement the subscription to the chat service, whereas IChatServer.CallbackReleased() will be called when the client-side callback instance will be released (i.e. when its variable will be assigned to nil), to unsubscribe for the chat service.

Writing the Publisher

On the server side, each call to IChatService.Join() would subscribe to an internal list of connections, simply stored as an array of IChatCallback:

type
  TChatService = class(TInterfacedObject,IChatService)
  protected
    fConnected: array of IChatCallback;
  public
    procedure Join(const pseudo: string; const callback: IChatCallback);
    procedure BlaBla(const pseudo,msg: string);
    procedure CallbackReleased(const callback: IInvokable);
  end;

procedure TChatService.Join(const pseudo: string; const callback: IChatCallback); begin InterfaceArrayAdd(fConnected,callback); end;

The InterfaceArrayAdd() function, as defined in SynCommons.pas, is a simple wrapper around any dynamic array of interface instances, so that you may use it, or the associated InterfaceArrayFind() or InterfaceArrayDelete() functions, to maintain the list of subscriptions.

Then a remote call to the IChatService.BlaBla() method should be broadcasted to all connected clients, just by calling the IChatCallback.BlaBla() method:

procedure TChatService.BlaBla(const pseudo,msg: string);
var i: integer;
begin
  for i := 0 to high(fConnected) do
    fConnected[i].BlaBla(pseudo,msg);
end;

Note that every call to IChatCallback.BlaBla() within the loop would be made via WebSockets, in an asynchronous and non blocking way, so that even in case of huge number of clients, the IChatService.BlaBla() method won't block.
In case of high numbers of messages, the framework is even able to gather push notification messages into a single bigger message, to reduce the resource use.

On the server side, the service implementation has been registered as such:

  Server.ServiceDefine(TChatService,[IChatService],sicShared).
    SetOptions([],[optExecLockedPerInterface]);

Here, the optExecLockedPerInterface option has been set, so that all method calls would be made thread-safe, so that concurrent access to the internal fConnected[] list would be safe.
Since a global list of connections is to be maintained, the service life time is defined as sicShared - see Instances life time implementation.

The following method will be called by the server, when a client callback instance is released (either explicitly, or if the connection is broken), so could be used to unsubscribe to the notification, simply by deleting the callback from the internal fConnected[] array:

procedure TChatService.CallbackReleased(const callback: IInvokable);
begin
  InterfaceArrayDelete(fConnected,callback);
end;

The framework will in fact recognize the following method definition in any interface type for a service:

   procedure CallbackReleased(const callback: IInvokable);

When a callback interface parameter (in our case, IChatCallback) will be released on the client side, this method will be called with the corresponding interface instance as parameter.
You do not have to call explicitly any method on the client side to unsubscribe a service: assigning nil to a callback variable, or feeing the class instance owning it as a field on the subscriber side, will automatically unregister it on the publisher side.

Consuming the service from the Subscriber side

On the client side, you implement the IChatCallback callback interface:

type
  TChatCallback = class(TInterfacedCallback,IChatCallback)
  protected
    procedure BlaBla(const pseudo, msg: string);
  end;

procedure TChatCallback.BlaBla(const pseudo, msg: string); begin writeln(#13'@',pseudo,' ',msg); end;

Then you subscribe to your remote service as such:

var Service: IChatService;
    callback: IChatCallback;
...
    Client.ServiceDefine([IChatService],sicShared);
    if not Client.Services.Resolve(IChatService,Service) then
      raise EServiceException.Create('Service IChatService unavailable');
...
      callback := TChatCallback.Create(Client,IChatCallback);
      Service.Join(pseudo,callback);
...
    try
      repeat
        readln(msg);
        if msg='' then
          break;
        Service.BlaBla(pseudo,msg);
      until false;
    finally
      callback := nil; // will unsubscribe from the remote publisher
      Service := nil;  // release the service local instance BEFORE Client.Free
    end;

You could easily implement more complex publish/subscribe mechanisms, including filtering, time to live or tuned broadcasting, by storing some additional information to the interface instance (e.g. some value to filter, a timestamp). A dynamic array of dedicated records, or a list of class instances, may be used to store the subscribers expectations.

If you compare with existing client/server SOA solutions (in Delphi, Java, C# or even in Go or other frameworks), this interface-based callback mechanism sounds pretty unique and easy to work with.
In fact, this is a good way of implementing callbacks conforming to SOLID design principles on the server side, and let the mORMot framework publish this mechanism in a client/server way, by using WebSockets. The very same code could be used on the server side, with no transmission nor marshaling overhead (via direct interface calls), and over a network, with optimized use of resource and bandwidth (via "fake" interface calls, and JSON marshalling over TCP/IP).

Ensure you take a look at the corresponding online documentation, which would be updated often than this blog article!
The ORM part of the framework uses this Publish-Subscribe feature to implement Real-Time master/slave replication.
Feedback is welcome on our forum, as usual.