Grid Market

1. Outline

Actors

In this section, we will implement a Grid Market, a trading market for computing resources.

We'll create an online Makret, where you can buy and sell the computing resources, which would be helpful to build Grid Computing systems cheaply. Also, we will build Consumer and Supplier systems. At last, we would build a Monitor system who can observe all transactions occured in the Market.

Of course, this project is a type of demo project designed to help you learning about the TGrid, so it doesn't really cost you to trading the computing powers. However, the notion of cross trading of computing resources is not a fiction. All the transactions through the Market, consumptions and supplies of computing resources between Consumers and Suppliers, is not a fiction but the real story.

  • Market: An intermediary market where you can trade computing resources
  • Consumer: Purchases and uses computing powers from Suppliers
  • Supplier: Provides its computing power to a Consumer
  • Monitor: Observes all transactions in the Market.

2. Design

2.1. Participants

2.1.1. Market

Market is a main server who represents a brokerage market where you can trade computing power.

Market is a web-socket server accepting Consumers, Suppliers and Monitors as clients. Market intermediates, between Consumers and Suppliers, not only computing power transactions, but also network communications.

Also, the list of all Consumers and Suppliers participating in the Market and transaction details are reported to the Monitors in real time.

2.1.2. Consumer

A Consumer who purchases and uses Suppliers' computing resources.

Consumer buys and consumes Suppliers' computing resources to build a Grid Computing system. If a Consumer succeeded to buying Suppliers' computing resources, the Consumer delivers source codes to each Supplier. Each Supplier compiles the delivered source and mounts the compiled program on to a new Worker program.

The Consumer interacts with those Worker programs.

2.1.3. Supplier

Supplier provides its computing resources to a Consumer.

Supplier provides its computing resources to a Consumer and receives money in return. Of course, this project is a demo project for learning about the TGrid, so it doesn't really return money. In addition, participating in the Market as a Supplier is very simple. Just opens a web browser and connects to a specific URL, that's all.

Also, when the deal with Consumer is determined, the Supplier would get source code from the Consumer. Supplier compiles the delivered source code and mount the program on a new Worker. The Worker program would interact with the Consumer.

2.1.4. Monitor

Monitor observes all the transaction occured in the Market.

Monitor gets list of all the participants; Consumers and Suppliers. Also, the Monitor observes all of the transactions between Consumers and Suppliers from the Market.

2.2. Controllers

2.2.1. Market

There's a Controller defining provided features from Market to Consumers.

Consumer utilizes this Controller for two main things. The first is to knowing which Suppliers are participating in the Market; getSuppliers(). The second is to buying those Suppliers' resources (buyResource()) and using them (assignees).

export namespace ConsumerChannel
{
    export interface IController
    {
        /**
         * `Controller`s of each `Provider` from each *Consumer*.
         */
        assignees: ArrayLike<Supplier.IController>;

        /**
         * Get unique identifier of the *Consumer*.
         */
        getUID(): number;

        /**
         * Entire information list of *Suppliers* in the *Market*.
         */
        getSuppliers(): ISupplier[];

        /**
         * Buy computing resource from a *Suppiler*.
         * 
         * @param uid Unique identifier of the target *Supplier*.
         * @return Whether Succeded to acquire or not.
         */
        buyResource(uid: number): boolean;
    }
}

Another Controller defining provided features from Market to Suppliers.

There're only two features that provided from Market to Supplier. The first is to getting unique identifier assigned to the Supplier. The other one is a variable named provder, be a Provider from Supplier based on the Consumer, also be a Driver<Controller> based on the Supplier.

  • Consumer: WebConnector<Provider>.getProvider()
  • Supplier: WebConnector.getDriver<Controller>()
export namespace SupplierChannel
{
    export interface IController
    {
        /**
         * A provider from Consumer
         */
        provider: object | null;

        /**
         * Get unique identifier of the *Supplier*.
         */
        getUID(): number;
    }
}

2.2.2. Consumer

The Provider from Market to Consumer just passes through the Consumer as an intermediary. It can be essentially regarded as provided to the Supplier. Actullay, Supplier connects to Market server and utilizes provided remote functions from Consumer's Provider thorugh the provder: object variable who is defined in the Servant.IController.

export namespace Consumer
{
    export interface IController
    {
        /**
         * List of providers for connected suppliers with the consumer.
         */
        servants: ArrayLike<Servant.IController>;
    }
}

export namespace Servant
{
    export interface IController
    {
        /**
         * A provider from consumer.
         */
        provider: object;

        /**
         * Join connection with the consumer.
         */
        join(): void;

        /**
         * Close connection with the consumer.
         */
        close(): void;
    }
}

2.2.3. Supplier

The Provider from Supplier to Market, just passes through Market as an intermediary. It can be essentially regarded as provided to the Consumer. Actually, Consumer connects to Market server and utilizes provided remote functions from the Suplier's Provider through the assignees: ArrayLike<Supplier.IController> variable which is defined in the ConsumerChannel.IController.

So you can see that all of the features defined in the Controller, an interface defining provided featrues from Supplier to Market (actually Consumer), are concentrating on the Consumer. Looking at the functions defined in the Controller, at first, there's a function assign() that informing which Consumer would receive computing resources from the Supplier. At next, a compile() function, compiling source code delivered from the Consumer and mounting it to a new Worker program, exists.

At last, you can see the provider object. This object represents a Provider from Worker program, compiled by source code delivered from the Consumer, to Consumer. Within framework of the Consumer or main program of the Supplier, it would be a type of Driver<Controller>.

  • WorkerServer<Provider>.getProvider()
  • WorkerConnector.getDriver<Controller>()
export namespace Supplier
{
    export interface IController
    {
        /**
         * A Provider from the Worker program.
         * 
         * Supplier compiles a source code delivered from the Consumer and mounts it to a new
         * Worker program. The object `provider` represents a Provider from the Worker program.
         * Within framework of the Consumer or main program of the Supplier, it would be a type
         * of Driver<Controller>.
         * 
         *   - {@link WorkerServer.getProvider}
         *   - {@link WorkerConnector.getDriver}
         * 
         * @warning Must be {@link compile compiled} before.
         */
        provider: object;

        /**
         * Assign a consumer to receive computing resources of this Supplier.
         */
        assign(consumer_uid: number): void;

        /**
         * Compiles source code and mount a new Worker program.
         * 
         * @param script Source code from the Consumer
         * @param args Arguments of the main function
         */
        compile(script: string, ...args: string[]): void;

        /**
         * Close the mounted Worker program.
         */
        close(): void;
    }
}

2.2.4. Monitor

There's a Provider from Monitor to Market. This Provider is designed only for one purpose and it can be represented by only one word: "Market, let me know everything happening to you". Therefore, All of the functions defined in the Provider and its Controller are designed only to informing what's happening in the Market to the Monitor.

Monitor can observe all of the transactions occured in the Market between Consumers and Suppliers. In means that whenever a Consumer buys a Supplier's computing resources, the Market informs the transaction to Monitors; transact(). Also, whenever a Consumer completes its computations and releases the Suppliers' computing resources, Market informs it to Monitors, too; release().

In addiction, Monitor can observe the full list of Consumers and Suppliers participating in the Market. When a Monitor connects to the Market server, the Market delivers entire participants list to the Monitor by calling the assign(). After that, whenever a participant enters or exits, the Market will inform it by calling relevant method like insertConsumer() or eraseSupplier(), etc.

export namespace Monitor
{
    export interface IController
    {
        /**
         * Assign all of the participants in the Market.
         * 
         * @param consumers List of Consumers' nodes
         * @param suppliers List of Suppliers' nodes
         */
        assign(consumers: IConsumerNode[], suppliers: ISupplierNode[]): void;

        /**
         * A Consumer has bought a Supplier's computing resources
         * 
         * @param consumer Unique identifier of the Consumer
         * @param supplier Unique identifier of the Supplier
         */
        transact(consumer: number, supplier: number): void;

        /**
         * A Consumer hasreleased computing resources of Suppliers had bought.
         * 
         * @param consumer Unique identifier of the Consumer
         */
        release(consumer: number): void;

        //----
        // INDIVIDUAL I/O
        //----
        /**
         * A Consumer has newly entered.
         * 
         * @param consumer Information about the Customer.
         */
        insertConsumer(consumer :IConsumerNode): void;

        /**
         * A Supplier has newly entered.
         *
         * @param supplier Information about the Supplier.
         */
        insertSupplier(supplier: ISupplierNode): void;

        /**
         * A Consumer has left.
         * 
         * @param uid Unique identifier of the Consumer
         */
        eraseConsumer(uid: number): void;

        /**
         * A Supplier has left.
         * 
         * @param uid Unique identifier of the Supplier
         */
        eraseSupplier(uid: number): void;
    }
}

2.3. Class Diagram

Class Diagram

3. Core Implementation

3.1. Market

Market is an intermediary market where Consumers and Suppliers can trade their computing resources.

Therefore, implementation code of the Market class starts with opening a websocket server. And whenever a client connects to the Market server, the Market class identifies its role baased on connection path and supports the client by creating a responsible class.

Path Role Generated Class
/consumer Consumer ConsummerChannel
/supplier Supplier SupplierChannel
/monitor Monitor Driver

core/market/Market.ts

import { HashMap } from "tstl/container/HashMap";
import { WebServer } from "tgrid/protocols/web/WebServer";
import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { Driver } from "tgrid/components/Driver";

import { ConsumerChannel } from "./ConsumerChannel";
import { SupplierChannel } from "./SupplierChannel";
import { Monitor } from "../monitor/Monitor";

export class Market
{
    private server_: WebServer<Provider>;

    private consumers_: HashMap<number, ConsumerChannel>;
    private suppliers_: HashMap<number, SupplierChannel>;
    private monitors_: HashMap<number, Driver<Monitor.IController>>;

    private static sequence_: number = 0;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * Default Constructor.
     */
    public constructor()
    {
        this.server_ = new WebServer<Provider>();

        this.consumers_ = new HashMap();
        this.suppliers_ = new HashMap();
        this.monitors_ = new HashMap();
    }

    public open(port: number): Promise<void>
    {
        return this.server_.open(port, async (acceptor: WebAcceptor<Provider>) =>
        {
            let uid: number = ++Market.sequence_;
            if (acceptor.path === "/monitor")
            {
                await this._Handle_monitor(uid, acceptor);
                return;
            }

            //----
            // PRELIMINARIES
            //----
            // DETERMINE ACTOR
            let instance: Instance;
            let dictionary: HashMap<number, Instance>;

            // MONITOR HANDLER
            let monitor_inserter: (drvier: Driver<Monitor.IController>) => Promise<void>;
            let monitor_eraser: (drvier: Driver<Monitor.IController>) => Promise<void>;

            // PARSE PATH
            if (acceptor.path === "/consumer")
            {
                instance = await ConsumerChannel.create(uid, this, acceptor as WebAcceptor<ConsumerChannel.Provider>);
                dictionary = this.consumers_;

                monitor_inserter = driver => driver.insertConsumer((instance as ConsumerChannel).toNode());
                monitor_eraser = driver => driver.eraseConsumer(uid);
            }
            else if (acceptor.path === "/supplier")
            {
                instance = await SupplierChannel.create(uid, acceptor as WebAcceptor<SupplierChannel.Provider>);
                dictionary = this.suppliers_;

                monitor_inserter = driver => driver.insertSupplier((instance as SupplierChannel).toNode());
                monitor_eraser = driver => driver.eraseSupplier(uid);
            }
            else
            {
                acceptor.reject(404, "Invalid URL");
                return;
            }

            //----
            // PROCEDURES
            //----
            // ENROLL TO DICTIONARY
            dictionary.emplace(uid, instance);
            console.log("A participant has come", this.consumers_.size(), this.suppliers_.size());

            // INFORM TO MONITORS
            for (let entry of this.monitors_)
                monitor_inserter(entry.second).catch(() => {});

            //----
            // DISCONNECTION
            //----
            // JOIN CONNECTION
            try 
            { 
                await acceptor.join(); 
            } 
            catch {}

            // ERASE ON DICTIONARY
            dictionary.erase(uid);
            console.log("A participant has left", this.consumers_.size(), this.suppliers_.size());

            // INFORM TO MONITORS
            for (let entry of this.monitors_)
                monitor_eraser(entry.second).catch(() => {});
        });
    }

    public async close(): Promise<void>
    {
        await this.server_.close();

        this.consumers_.clear();
        this.suppliers_.clear();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public getSuppliers(): HashMap<number, SupplierChannel>
    {
        return this.suppliers_;
    }

    public getMonitors(): HashMap<number, Driver<Monitor.IController>>
    {
        return this.monitors_;
    }

    /* ----------------------------------------------------------------
        MONITOR HANDLER
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private async _Handle_monitor(uid: number, acceptor: WebAcceptor<{}>): Promise<void>
    {
        console.log("A monitor has come", this.monitors_.size());

        // ACCEPT CONNECTION
        let driver: Driver<Monitor.IController> = acceptor.getDriver<Monitor.IController>();
        await acceptor.accept(null);

        this.monitors_.emplace(uid, driver);

        //----
        // SEND CURRENT RELATIONSHIP
        //----
        // DO ASSIGN
        await driver.assign
        (
            [...this.consumers_].map(it => it.second.toNode()), 
            [...this.suppliers_].map(it => it.second.toNode())
        );

        //----
        // JOIN CONNECTION
        //----
        await acceptor.join();
        this.monitors_.erase(uid);

        console.log("A monitor has left", this.monitors_.size());
    }
}

type Provider = ConsumerChannel.Provider | SupplierChannel.Provider;
type Instance = ConsumerChannel | SupplierChannel;

ConsumerChannel is a class designed to corresponding a Consumer who connects to the Market server as a client.

Market server program records and manages list of computing resources of Suppliers purchased by the Consumer through the ConsuerChannel class. Also, through the ConsumerChannel.Provider class, the Consumer can list up information of Suppliers, those who are connecting in the Market, and purchase their computing resources.

core/market/ConsumerChannel.ts

import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { HashMap } from "tstl/container/HashMap";
import { ArrayDict } from "../../utils/ArrayDict";
import { DateUtil } from "../../utils/DateUtil";

import { IConsumerNode } from "../monitor/IConsumerNode";
import { ISupplier } from "../supplier/ISupplier";
import { SupplierChannel } from "./SupplierChannel";

import { Market } from "./Market";
import { Supplier } from "../supplier/Supplier";
import { Consumer } from "../consumer/Consumer";

export class ConsumerChannel
{
    public readonly uid: number;
    public readonly created_at: Date;

    /**
     * @hidden
     */
    private market_: Market;

    /**
     * @hidden
     */
    private acceptor_: WebAcceptor<ConsumerChannel.Provider>;

    /**
     * @hidden
     */
    private servants_: HashMap<number, SupplierChannel>;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, market: Market, acceptor: WebAcceptor<ConsumerChannel.Provider>)
    {
        this.uid = uid;
        this.market_ = market;
        this.acceptor_ = acceptor;

        this.created_at = new Date();
        this.servants_ = new HashMap();
    }

    /**
     * @internal
     */
    public static async create(uid: number, market: Market, acceptor: WebAcceptor<ConsumerChannel.Provider>): Promise<ConsumerChannel>
    {
        let ret: ConsumerChannel = new ConsumerChannel(uid, market, acceptor);
        await ret.acceptor_.accept(new ConsumerChannel.Provider(ret));

        ret._Handle_disconnection();
        return ret;
    }

    /**
     * @hidden
     */
    private async _Handle_disconnection(): Promise<void>
    {
        try { await this.acceptor_.join(); } catch {}
        for (let it of this.servants_)
            await it.second.release(this);
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public getMarket(): Market
    {
        return this.market_;
    }

    public getDriver()
    {
        return this.acceptor_.getDriver<Consumer.IController>();
    }

    /**
     * @internal
     */
    public getAssignees()
    {
        return this.servants_;
    }

    public toNode(): IConsumerNode
    {
        return {
            uid: this.uid,
            created_at: DateUtil.to_string(this.created_at, true),
            servants: [...this.servants_].map(it => it.first)
        };
    }

    /* ----------------------------------------------------------------
        SUPPLIERS I/O
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public async transact(supplier: SupplierChannel): Promise<boolean>
    {
        if (this.servants_.has(supplier.uid) || // DUPLICATED
            await supplier.transact(this) === false) // MONOPOLIZED
            return false;

        // CONSTR5UCT SERVANT
        await this.servants_.emplace(supplier.uid, supplier);

        // PROVIDER FOR CONSUMER (SERVANT) := CONTROLLER OF SUPPLIER
        let provider = supplier.getDriver();
        this.acceptor_.getProvider()!.assginees.set(supplier.uid, provider);

        // RETURN WITH ASSIGNMENT
        await provider.assign(this.uid);
        for (let entry of this.market_.getMonitors())
            entry.second.transact(this.uid, supplier.uid).catch(() => {});

        return true;
    }

    /**
     * @internal
     */
    public async release(supplier: SupplierChannel): Promise<void>
    {
        this.servants_.erase(supplier.uid);
        this.acceptor_.getProvider()!.assginees.erase(supplier.uid);

        await supplier.release(this);
        for (let entry of this.market_.getMonitors())
            entry.second.release(supplier.uid).catch(() => {});
    }
}

export namespace ConsumerChannel
{
    /**
     * @hidden
     */
    export interface IController
    {
        assginees: ArrayLike<Supplier.IController>;

        getUID(): number;
        getSuppliers(): ISupplier[];
        buyResource(supplier: ISupplier): Promise<boolean>;
    }

    /**
     * @hidden
     */
    export class Provider implements IController
    {
        private consumer_: ConsumerChannel;
        private market_: Market;
        public assginees: ArrayDict<Supplier.IController>;

        public constructor(consumer: ConsumerChannel)
        {
            this.consumer_ = consumer;
            this.market_ = consumer.getMarket();
            this.assginees = new ArrayDict();
        }

        public getUID(): number
        {
            return this.consumer_.uid;
        }

        public getSuppliers(): ISupplier[]
        {
            return this.market_.getSuppliers().toJSON().map(entry => entry.second.toJSON());
        }

        public async buyResource(supplier: ISupplier): Promise<boolean>
        {
            let map = this.market_.getSuppliers();
            let it = map.find(supplier.uid);

            if (it.equals(map.end()) === true)
                return false;

            return await this.consumer_.transact(it.second);
        }
    }
}

SupplierChannel is a class designed to corresponding a Supplier who connects to the Market server as a client.

Market server program records and manages performance information about the Supplier through the SupplierChannel class. Also, Consumer who purchased the Supplier's computing resource is also wrote on the SupplierChannel class.

Also, by the SupplierChannel.Provider class, the Consumer can remotely call functions, provided from the Supplier (of Worker program), through the Driver<Controller>.

core/market/SupplierChannel.ts

import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { Driver } from "tgrid/components/Driver";
import { Mutex } from "tstl/thread/Mutex";
import { UniqueLock } from "tstl/thread/UniqueLock";
import { DateUtil } from "../../utils/DateUtil";

import { ISupplier } from "../supplier/ISupplier";
import { ISupplierNode } from "../monitor/ISupplierNode";
import { Supplier } from "../supplier/Supplier";

import { ConsumerChannel } from "./ConsumerChannel";

export class SupplierChannel implements Readonly<ISupplier>
{
    public readonly uid: number;
    public readonly created_at: Date;
    public readonly performance: ISupplier.IPerformance;

    private acceptor_: WebAcceptor<SupplierChannel.Provider>;
    private mutex_: Mutex;
    private consumer_: ConsumerChannel | null;
    private assigned_at_: Date | null;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, acceptor: WebAcceptor<SupplierChannel.Provider>)
    {
        this.uid = uid;
        this.created_at = new Date();

        this.acceptor_ = acceptor;
        this.performance = 
        {
            mean: 1.0,
            risk: 0.0,
            credit: 0.0
        };

        this.mutex_ = new Mutex();
        this.consumer_ = null;
        this.assigned_at_ = null;
    }

    /**
     * @internal
     */
    public static async create(uid: number, acceptor: WebAcceptor<SupplierChannel.Provider>): Promise<SupplierChannel>
    {
        let ret: SupplierChannel = new SupplierChannel(uid, acceptor);
        await ret.acceptor_.accept(new SupplierChannel.Provider(ret));

        ret._Handle_disconnection();
        return ret;
    }

    /**
     * @hidden
     */
    private async _Handle_disconnection(): Promise<void>
    {
        try { await this.acceptor_.join(); } catch {}
        await UniqueLock.lock(this.mutex_, async () =>
        {
            if (this.consumer_ !== null)
                this.consumer_.release(this);
        });
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    /**
     * @inheritDoc
     */
    public get free(): boolean
    {
        return this.consumer_ === null;
    }

    public getDriver(): Driver<Supplier.IController>
    {
        return this.acceptor_.getDriver<Supplier.IController>();
    }

    public getConsumer(): ConsumerChannel | null
    {
        return this.consumer_;
    }

    public toJSON(): ISupplier
    {
        let ret: ISupplier = 
        {
            uid: this.uid,
            performance: this.performance,
            free: this.free
        };
        return ret;
    }

    public toNode(): ISupplierNode
    {
        return {
            uid: this.uid,
            created_at: DateUtil.to_string(this.created_at, true),
            assigned_at: this.assigned_at_
                ? DateUtil.to_string(this.assigned_at_, true)
                : null
        };
    }

    /* ----------------------------------------------------------------
        ASSIGNER
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public async transact(consumer: ConsumerChannel): Promise<boolean>
    {
        let ret: boolean;
        await UniqueLock.lock(this.mutex_, () =>
        {
            if ((ret = this.free) === true)
            {
                this.consumer_ = consumer;
                this.assigned_at_ = new Date();
                this.acceptor_.getProvider()!.provider = consumer.getDriver().servants[this.uid];
            }
        });
        return ret!;
    }

    /**
     * @internal
     */
    public async release(consumer: ConsumerChannel): Promise<void>
    {
        await UniqueLock.lock(this.mutex_, async () =>
        {
            if (this.consumer_ === consumer)
            {
                // ERASE CONSUMER
                this.consumer_ = null;
                this.assigned_at_ = null;
                this.acceptor_.getProvider()!.provider = null;

                // TO ANTICIPATE ABUSING
                this.getDriver().close().catch(() => {});
            }
        });
    }
}

export namespace SupplierChannel
{
    export interface IController
    {
        provider: object | null;
        getUID(): number;
    }

    export class Provider implements IController
    {
        private channel_: SupplierChannel;

        // PROVIDER FOR SUPPLIER := CONTROLLER OF CONSUMER (SERVANT)
        public provider: object | null = null;

        public constructor(channel: SupplierChannel)
        {
            this.channel_ = channel;
        }

        public getUID(): number
        {
            return this.channel_.uid;
        }
    }
}

3.2. Consumer

The Consumer is a facade class designed for the Consumer.

Consumer can participate in the Market by calling the Consumer.participate() method. After that, the Consumer can list up all of the Suppliers participating in the Market by calling the Consumer.getSuppliers() method and also can purchase some of their computing resources by calling the Consumer.buyResource() method.

core/consumer/Consumer.ts

import { WebConnector } from "tgrid/protocols/web/WebConnector";
import { Driver } from "tgrid/components/Driver";
import { ArrayDict } from "../../utils/ArrayDict";

import { Servant } from "./Servant";
import { ISupplier } from "../supplier/ISupplier";
import { ConsumerChannel } from "../market/ConsumerChannel";

export class Consumer
{
    public readonly uid: number;
    private connector_: WebConnector<Consumer.Provider>;

    private market_: Driver<ConsumerChannel.IController>;
    private servants_: Servant[];

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    private constructor(uid: number, connector: WebConnector<Consumer.Provider>)
    {
        this.uid = uid;
        this.connector_ = connector;

        this.market_ = connector.getDriver<ConsumerChannel.IController>();
        this.servants_ = [];
    }

    public static async participate(url: string): Promise<Consumer>
    {
        let provider: Consumer.Provider = new Consumer.Provider();
        let connector: WebConnector<Consumer.Provider> = new WebConnector(provider);
        await connector.connect(url);

        let uid: number = await connector.getDriver<ConsumerChannel.IController>().getUID();
        return new Consumer(uid, connector);
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public getSuppliers(): Promise<ISupplier[]>
    {
        return this.market_.getSuppliers();
    }

    public async buyResource(base: ISupplier): Promise<Servant | null>
    {
        if (await this.market_.buyResource(base) === false)
            return null;

        let driver = this.market_.assginees[base.uid];
        let ret: Servant = Servant.create(base, driver);

        ret.join().then(() =>
        {
            for (let i: number = 0; i < this.servants_.length; ++i)
                if (this.servants_[i].uid === ret.uid)
                {
                    this.servants_.splice(i, 1);
                    break;
                }
            this.connector_.getProvider()!.servants.erase(ret.uid);
        });

        this.servants_.push(ret);
        this.connector_.getProvider()!.servants.set(ret.uid, new Servant.Provider(ret));

        return ret;
    }
}

export namespace Consumer
{
    /**
     * @internal
     */
    export interface IController
    {
        servants: ArrayLike<Servant.IController>;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        public servants: ArrayDict<Servant.Provider> = new ArrayDict();
    }
}

The Servant class manages the computing resources purchased from Suppliers through the Consumer.buyResource(). The key role of the Servant class is to being a Communicator interacting with the Worker program mounted on the Supplier, even if Market and main program of the Supplier are located between the Consumer and Worker program of the Supplier.

Consumer passes the Provider and source code to the Supplier through the Servant.compile() method. Target Supplier would compile the program code and mount it to a new Worker program. The Worker program is the final instance that would interact with the Consumer program.

core/consumer/Servant.ts

import { Driver } from "tgrid/components/Driver";
import { ConditionVariable } from "tstl/thread/ConditionVariable";

import { ISupplier } from "../supplier/ISupplier";
import { Supplier } from "../supplier/Supplier";

export class Servant implements Readonly<ISupplier>
{
    /**
     * @hidden
     */
    private base_: ISupplier;

    /**
     * @hidden
     */
    private assignee_: Driver<Supplier.IController>;

    /**
     * @hidden
     */
    private joiners_: ConditionVariable;

    /**
     * @hidden
     */
    private provider_?: object | null;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(base: ISupplier, driver: Driver<Supplier.IController>)
    {
        this.base_ = base;
        this.assignee_ = driver;

        this.joiners_ = new ConditionVariable();
    }

    /**
     * @internal
     */
    public static create(base: ISupplier, driver: Driver<Supplier.IController>): Servant
    {
        return new Servant(base, driver);
    }

    public async compile(provider: object | null, script: string, ...args: string[]): Promise<void>
    {
        this.provider_ = provider;
        await this.assignee_.compile(script, ...args);
    }

    public async close(): Promise<void>
    {
        await this.assignee_.close();
        await this.joiners_.notify_all();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public get provider(): object | null | undefined
    {
        return this.provider_;
    }

    public getDriver<Controller extends object>(): Driver<Controller>
    {
        return this.assignee_.provider as Driver<Controller>;
    }

    public join(): Promise<void>;
    public join(ms: number): Promise<boolean>;
    public join(until: Date): Promise<boolean>;

    public join(param?: number | Date): Promise<void | boolean>
    {
        if (param === undefined)
            return this.joiners_.wait();
        else if (param instanceof Date)
            return this.joiners_.wait_until(param);
        else
            return this.joiners_.wait_for(param);
    }

    /* ----------------------------------------------------------------
        PROPERTIES
    ---------------------------------------------------------------- */
    public get uid(): number
    {
        return this.base_.uid;
    }
    public get performance(): ISupplier.IPerformance
    {
        return this.base_.performance;
    }
    public get free(): boolean
    {
        return false;
    }
}

export namespace Servant
{
    /**
     * @internal
     */
    export interface IController
    {
        provider: object;

        join(): Promise<void>;
        close(): Promise<void>;
    }

    /**
     * @internal
     */
    export class Provider
    {
        private base_: Servant;

        public constructor(base: Servant)
        {
            this.base_ = base;
        }

        public get provider(): object
        {
            return this.base_.provider!;
        }

        public join(): Promise<void>
        {
            return this.base_.join();
        }

        public close(): Promise<void>
        {
            return this.base_.close();
        }
    }
}

3.3. Supplier

The Supplier is a facde class designed for the Supplier.

Supplier can participate in the Market by calling the Supplier.participate() method. Also, Supplier provides features for Market and Consumer through the Supplier.Provider class defined in the internal namespace.

core/supplier/Supplier.ts

import { EventEmitter } from "events";

import { WebConnector } from "tgrid/protocols/web/WebConnector";
import { WorkerConnector } from "tgrid/protocols/workers/WorkerConnector";
import { Driver } from "tgrid/components/Driver";

import { IPointer } from "tstl/functional/IPointer";
import { SupplierChannel } from "../market/SupplierChannel";

export class Supplier extends EventEmitter
{
    public readonly uid: number;

    /**
     * @hidden
     */
    private connector_: WebConnector<Supplier.Provider>;

    /* ----------------------------------------------------------------
        CONSTRUCTOR
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, connector: WebConnector<Supplier.Provider>)
    {
        super();

        this.uid = uid;
        this.connector_ = connector;
    }

    public static async participate(url: string): Promise<Supplier>
    {
        // POINTERS - LAZY CONSTRUCTION
        let basePtr: IPointer<Supplier> = { value: null! };
        let workerPtr: IPointer<WorkerConnector> = { value: null! };

        // PREPARE ASSETS
        let provider = new Supplier.Provider(basePtr, workerPtr);
        let connector: WebConnector<Supplier.Provider> = new WebConnector(provider);
        let driver: Driver<SupplierChannel.IController> = connector.getDriver<SupplierChannel.IController>();

        // CONSTRUCT WORKER
        let worker: WorkerConnector = new WorkerConnector(driver.provider);
        workerPtr.value = worker;

        // CONNECTION & CONSTRUCTION
        await connector.connect(url);
        let ret: Supplier = new Supplier(await driver.getUID(), connector);
        basePtr.value = ret;

        // RETURNS
        return ret;
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }
}

export namespace Supplier
{
    /**
     * @internal
     */
    export interface IController
    {
        provider: object;

        assign(consumerUID: number): void;
        compile(script: string, ...args: string[]): Promise<void>;
        close(): Promise<void>;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        private base_ptr_: IPointer<Supplier>;
        private worker_ptr_: IPointer<WorkerConnector>;
        private consumer_uid_?: number;

        /* ----------------------------------------------------------------
            CONSTRUCTOR
        ---------------------------------------------------------------- */
        public constructor(basePtr: IPointer<Supplier>, workerPtr: IPointer<WorkerConnector>)
        {
            this.base_ptr_ = basePtr;
            this.worker_ptr_ = workerPtr;
        }

        public assign(consumerUID: number): void
        {
            this.consumer_uid_ = consumerUID;
            this.base_ptr_.value.emit("assign", consumerUID);
        }

        public async compile(code: string, ...args: string[]): Promise<void>
        {
            // FOR SAFETY
            let state = this.worker_ptr_.value.state;
            if (state !== WorkerConnector.State.NONE && state !== WorkerConnector.State.CLOSED)
                await this.worker_ptr_.value.close();

            // DO COMPILE
            await this.worker_ptr_.value.compile(code, ...args);

            // EMIT EVENTS
            this.base_ptr_.value.emit("compile", this.consumer_uid_, code, ...args);
            this.worker_ptr_.value.join().then(() =>
            {
                this.base_ptr_.value.emit("close", this.consumer_uid_);
            });
        }

        public close(): Promise<void>
        {
            return this.worker_ptr_.value.close();
        }

        /* ----------------------------------------------------------------
            ACCESSORS
        ---------------------------------------------------------------- */
        public get provider(): Driver<object>
        {
            return this.worker_ptr_.value.getDriver<object>();
        }

        public isFree(): boolean
        {
            return this.worker_ptr_.value.state === WorkerConnector.State.NONE
                || this.worker_ptr_.value.state === WorkerConnector.State.CLOSED;
        }
    }
}

In addition, identifier and performance information about a Supplier can be summarized as ISupplier structure. Consumer references the ISupplier information and determines whether purchase the Supplier's computing resources or not.

core/supplier/ISupplier.ts

export interface ISupplier
{
    uid: number;
    performance: ISupplier.IPerformance;
    free: boolean;
}
export namespace ISupplier
{
    export interface IPerformance
    {
        mean: number;
        risk: number;
        credit: number;
    }
}

3.4. Monitor

The Monitor is a facade class designed for the Monitor.

Monitor can participate in the Market by calling the Monitor.participate() method. Also, Monitor can observe all transactions occured in the Market by providing a Monitor.Provider object to the Market server.

In other words, whenever transaction or participants I/O occured in the Market, Market informs it to Monitors by calling related function remotely through the object.

`core/monitor/Monitor.ts

import { WebConnector } from "tgrid/protocols/web";

import { EventEmitter } from "events";
import { HashMap } from "tstl/container/HashMap";
import { ConditionVariable } from "tstl/thread/ConditionVariable";
import { IPointer } from "tstl/functional/IPointer";

import { ConsumerNode } from "./ConsumerNode";
import { SupplierNode } from "./SupplierNode";
import { IConsumerNode } from "./IConsumerNode";
import { ISupplierNode } from "./ISupplierNode";

export class Monitor
{
    /**
     * @hidden
     */
    private connector_: WebConnector;

    /**
     * @hidden
     */
    private consumers_: HashMap<number, ConsumerNode>;

    /**
     * @hidden
     */
    private suppliers_: HashMap<number, SupplierNode>;

    /**
     * @hidden
     */
    private emitter_: EventEmitter;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(connector: WebConnector)
    {
        this.connector_ = connector;

        this.consumers_ = new HashMap();
        this.suppliers_ = new HashMap();
        this.emitter_ = new EventEmitter();
    }

    /**
     * @internal
     */
    public static async participate(url: string): Promise<Monitor>
    {
        // PREPARE ASSETS
        let ptr: IPointer<Monitor> = { value: null! };
        let waitor: ConditionVariable = new ConditionVariable();

        let provider: Monitor.Provider = new Monitor.Provider(ptr, waitor);
        let connector: WebConnector = new WebConnector(provider);

        // LAZY CREATION
        ptr.value = new Monitor(connector);

        // CONNECT & WAIT MARKET
        await connector.connect(url);
        await waitor.wait();

        // RETURNS
        return ptr.value;
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public on(type: "refresh", listener: (consumers: HashMap<number, ConsumerNode>, suppliers: HashMap<number, SupplierNode>) => void): void
    {
        this.emitter_.on(type, listener);
    }

    public getConsumers(): HashMap<number, ConsumerNode>
    {
        return this.consumers_;
    }

    public getSuppliers(): HashMap<number, SupplierNode>
    {
        return this.suppliers_;
    }

    /**
     * @internal
     */
    public _Refresh(): void
    {
        this.emitter_.emit("refresh", this.consumers_, this.suppliers_);
    }
}

export namespace Monitor
{
    /**
     * @internal
     */
    export interface IController
    {
        assign(consumers: IConsumerNode[], suppliers: ISupplierNode[]): Promise<void>;

        insertConsumer(consumer :IConsumerNode): void;
        insertSupplier(supplier: ISupplierNode): void;
        eraseConsumer(uid: number): void;
        eraseSupplier(uid: number): void;

        transact(consumer: number, supplier: number): void;
        release(uid: number): void;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        private ptr_: IPointer<Monitor>;
        private waitor_: ConditionVariable;

        /* ----------------------------------------------------------------
            CONSTRUCTORS
        ---------------------------------------------------------------- */
        public constructor(ptr: IPointer<Monitor>, waitor: ConditionVariable)
        {
            this.ptr_ = ptr;
            this.waitor_ = waitor;
        }

        public async assign(rawConsumers: IConsumerNode[], rawSuppliers: ISupplierNode[]): Promise<void>
        {
            let base: Monitor = this.ptr_.value;
            for (let raw of rawSuppliers)
                base.getSuppliers().emplace(raw.uid, new SupplierNode(raw));

            for (let raw of rawConsumers)
            {
                let consumer: ConsumerNode = new ConsumerNode(raw);
                for (let uid of raw.servants)
                {
                    let supplier: SupplierNode = base.getSuppliers().get(uid);
                    consumer.servants.emplace(uid, supplier);
                }
                base.getConsumers().emplace(raw.uid, consumer);
            }
            await this.waitor_.notify_all();
        }

        /* ----------------------------------------------------------------
            ELEMENTS I/O
        ---------------------------------------------------------------- */
        public insertConsumer(raw: IConsumerNode): void
        {
            let base: Monitor = this.ptr_.value;
            let consumer: ConsumerNode = new ConsumerNode(raw);

            base.getConsumers().emplace(consumer.uid, consumer);
            base._Refresh();
        }

        public insertSupplier(raw: ISupplierNode): void
        {
            let base: Monitor = this.ptr_.value;
            base.getSuppliers().emplace(raw.uid, new SupplierNode(raw));
            base._Refresh();
        }

        public eraseConsumer(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let consumer: ConsumerNode = base.getConsumers().get(uid); 
            for (let entry of consumer.servants)
                entry.second.release();

            base.getConsumers().erase(uid);
            base._Refresh();
        }

        public eraseSupplier(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let supplier: SupplierNode = base.getSuppliers().get(uid);
            if (supplier.assignee !== null)
                supplier.assignee.servants.erase(uid);

            supplier.release();
            base.getSuppliers().erase(uid);

            base._Refresh();
        }

        /* ----------------------------------------------------------------
            RELATIONSHIPS
        ---------------------------------------------------------------- */
        public transact(customerUID: number, supplierUID: number): void
        {
            let base: Monitor = this.ptr_.value;
            let consumer: ConsumerNode = base.getConsumers().get(customerUID);
            let supplier: SupplierNode = base.getSuppliers().get(supplierUID);

            consumer.servants.emplace(supplier.uid, supplier);
            supplier.assign(consumer);

            base._Refresh();
        }

        public release(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let supplier: SupplierNode = base.getSuppliers().get(uid);
            if (supplier.assignee !== null)
                supplier.assignee.servants.erase(uid);

            supplier.release();
            base._Refresh();
        }
    }
}

The ConsumerNode is a class designed to represent a Consumer who is participating in the Market. It also records information about the Suppliers purchased by the Consumer.

core/monitor/ConsumerNode.ts

import { HashMap } from "tstl/container/HashMap";
import { SupplierNode } from "./SupplierNode";
import { IConsumerNode } from "./IConsumerNode";

export class ConsumerNode
{
    public readonly uid: number;
    public readonly servants: HashMap<number, SupplierNode>;
    public readonly created_at: Date;

    public constructor(raw: IConsumerNode)
    {
        this.uid = raw.uid;
        this.servants = new HashMap();
        this.created_at = new Date(raw.created_at);
    }
}

The SupplierNode is a class designed to represent a Supplier who is participating in the Market. Information about the Consumer who purchased computing resources of the Supplier is also recorded in the SupplierNode class.

core/monitor/SupplierNode.ts

import { ConsumerNode } from "./ConsumerNode";
import { ISupplierNode } from "./ISupplierNode";

export class SupplierNode
{
    public readonly uid: number;
    public readonly created_at: Date;

    /**
     * @hidden
     */
    private assignee_: ConsumerNode | null;

    /**
     * @hidden
     */
    private assigned_at_: Date | null;

    public constructor(raw: ISupplierNode)
    {
        this.uid = raw.uid;
        this.created_at = new Date(raw.created_at);

        this.assignee_ = null;
        this.assigned_at_ = raw.assigned_at
            ? new Date(raw.assigned_at)
            : null;
    }

    public get assignee(): ConsumerNode | null
    {
        return this.assignee_;
    }

    public get assigned_at(): Date | null
    {
        return this.assigned_at_;
    }

    /**
     * @internal
     */
    public assign(obj: ConsumerNode): void
    {
        this.assignee_ = obj;
        this.assigned_at_ = new Date();
    }

    /**
     * @internal
     */
    public release(): void
    {
        this.assignee_ = null;
        this.assigned_at_ = null;
    }
}

results matching ""

    No results matching ""