Grid Market

1. Outline

Actors

이번 단원에서는 Computing 자원을 거래할 수 있는 Grid Market 을 만들어봅니다.

Grid Computing 시스템을 구축하는 데 필요한 연산력을 매매할 수 있는 온라인 시장, Market 을 만듦니다. 그리고 그 시장에 참여하여 연산력을 거래하는 수요자 Consumer 와 공급자 Supplier 시스템을 각각 만들어봅니다. 마지막으로 시장에서 이루어지는 모든 매매행위를 들여다 볼 수 있는 Monitor 시스템을 만들 것입니다.

물론, 이 프로젝트는 TGrid 를 익히는 데 도움을 주기 위해 만든 데모 프로젝트로써, Market 에서 연산력을 사고 파는 데 실제로 돈이 오가지는 않습니다. 하지만, 연산력을 상호 매매한다는 개념 자체가 허구인 것은 아닙니다. Market 을 통하여 이루어지는 일련의 행위, Consumer 와 Supplier 간의 상호 연산력의 소비와 공급은, 모두 허구가 아닌 실제의 것이 될 것입니다.

  • Market: 연산력을 사고팔 수 있는 중개 시장
  • Consumer: Supplier 의 연산력을 구매하여 이를 사용함
  • Supplier: Consumer 에게 자신의 연산력을 제공함
  • Monitor: Market 에서 이루어지는 모든 거래행위를 들여다 봄

2. Design

2.1. Participants

2.1.1. Market

Market 은 연산력을 사고 팔 수 있는 중개 시장이자, 메인 서버입니다.

Market 은 웹소켓 서버로써, Consumer 와 Supplier 및 Monitor 들을 클라이언트로 받아들입니다. 그리고 Consumer 와 Supplier 의 연산 자원 거래를 중개할 뿐 아니라, 거래가 체결된 Consumer 와 Supplier 간에 네트워크 통신 또한 중개합니다.

그리고 Market 에 참여하는 모든 Consumer 와 Supplier 들의 리스트 및 거래내역 일체는 실시간으로 Monitor 들에게 보고됩니다.

2.1.2. Consumer

Supplier 의 자원을 구매하여 사용하는 수요자입니다.

Consumer 는 Grid Computing 을 구성하는 데 필요한 연산력을 확보하기 위하여, Supplier 의 연산 자원을 가져다쓰는 수요자입니다.

Consumer 가 Supplier 의 자원을 구매하는 데 성공하거든, 각 Supplier 들에게 그들이 구동해야 할 프로그램의 소스를 건네주게 됩니다. 각 Supplier 는 이 소스코드를 컴파일하여 Worker 프로그램에 탑재하게 되고, Consumer 는 그 Worker 프로그램들과 연동하게 됩니다.

2.1.3. Supplier

자신의 연산 자원을 Consumer 에게 제공하는 공급자입니다.

Supplier 는 Consumer 에게 자신의 연산력을 제공하며, 이를 대가로 돈을 받습니다. 단, 이 프로젝트는 TGrid 에 대한 이해를 돕기 위한 데모 프로젝트에 지나지 않는지라, 실제로 돈이 오가지는 않습니다. 더불어 Market 에 Supplier 로 참여하는 것은 매우 간단하여, 단지 웹 브라우저를 열고 특정 URL 에 접속하기만 하면 될 뿐입니다.

그리고 Consumer 와 거래가 체결되거든, 자신의 연산 자원을 구매한 Consumer 로부터 프로그램 코드를 제공받게 됩니다. Supplier 는 이를 컴파일하여 Worker 프로그램에 탑재하게 되고, 해당 Worker 프로그램이 Consumer 와 연동하게 됩니다.

2.1.4. Monitor

Market 에서 이루어지는 모든 거래내역을 들여다볼 수 있는 감시자입니다.

Market 서버에 접속해있는 모든 시장 참여자들 (Consumer 와 Supplier) 들의 현황을 실시간으로 확인할 수 있으며, 시장에서 이루어지는 Consumer 와 Supplier 간의 모든 거래내역 또한 들여다볼 수 있는 감시자 시스템입니다.

2.2. Controllers

2.2.1. Market

아래는 Market 이 Consumer 에게 제공할 기능들을 정의한 Controller 입니다.

Consumer 가 이 Controller 를 통해 할 수 있는 일은 크게 두 가지로 나눌 수 있습니다. 첫째는 바로 현재 Market 에 참여중인 Supplier 들이 누구인지 아는 것입니다 (getSuppliers()). 그리고 둘째는 그들의 자원을 구원을 구매하고 (buyResource()), 그것들을 사용하는 것입니다 (assignees).

export namespace ConsumerChannel
{
    export interface IController
    {
        /**
         * 해당 *Consumer* 에게 배정된 *Supplier* 들의 `Controller` 리스트.
         */
        assignees: ArrayLike<Supplier.IController>;

        /**
         * Get 해당 *Consumer* 의 식별자 번호
         */
        getUID(): number;

        /**
         * Get *Market* 에 있는 전체 *Supplier* 들의 리스트
         */
        getSuppliers(): ISupplier[];

        /**
         * 대상 *Supplier* 의 자원을 구매함.
         * 
         * @param uid 대상 Supplier 의 식별자 번호 uid.
         * @return 성공 여부, 이미 다른 Consumer 에게 먼저 배정되었다면 false.
         */
        buyResource(uid: number): boolean;
    }
}

그리고 Market 이 Supplier 에 제공해주는 Provider 가 가진 객체는 딱 두가지 뿐입니다. 첫째는 해당 Supplier 에게 부여된 식별자 번호를 가져오는 함수이며, 둘째는 provider 변수로써, Consumer 를 기준으로는 Supplier 에게 제공하는 Provider 이고, Supplier 를 기준으로는 Driver<Controller> 가 되겠죠.

  • Consumer: WebConnector<Provider>.getProvider()
  • Supplier: WebConnector.getDriver<Controller>()
export namespace SupplierChannel
{
    export interface IController
    {
        /**
         * Consumer 에서 제공해주는 provider 객체
         */
        provider: object | null;

        /**
         * Get 해당 *Supplier* 의 식별자 번호
         */
        getUID(): number;
    }
}

2.2.2. Consumer

Consumer 가 Market 에게 제공하는 Provider 는, Market 은 단지 중간 매개체로써 경유하기만 할 뿐, 실질적으로는 Supplier 들에게 제공되는 Provider 라고 보아도 무방합니다. 실제로 Supplier 는 Market 서버에 접속한 후, Servant.IController 에 정의된 provider: object 변수를 이용하여 Consumer 의 Provider 객체가 제공하는 함수들을 이용합니다.

export namespace Consumer
{
    export interface IController
    {
        /**
         * 해당 Consumer 와 연결된 Supplier 들에게 제공할 provider 등의 리스트
         */
        servants: ArrayLike<Servant.IController>;
    }
}

export namespace Servant
{
    export interface IController
    {
        /**
         * Consumer 에서 제공해주는 provider
         */
        provider: object;

        /**
         * Consumer 와의 연결이 종료될 때까지 대기함
         */
        join(): void;

        /**
         * Consumer 와의 연결을 종료함
         */
        close(): void;
    }
}

2.2.3. Supplier

Supplier 가 Market 에게 제공하는 Provider 는, Market 은 단지 중간 매개체로써 경유하기만 할 뿐, 실질적으로는 Consumer 에게 제공되는 Provider 라고 보아도 무방합니다. 실제로 Consumer 는 Market 서버에 접속한 후, ConsumerChannel.IController 에 정의된 assignees: ArrayLike<Supplier.IController> 변수를 이용하여 Supplier 의 Provider 객체가 제공하는 함수들을 이용합니다.

따라서 Supplier 가 Market (실질적으로는 Consumer) 에 제공하는 함수들에 인터페이스를 정의한 Controller 를 보시면, 모든 함수들의 초점이 바로 Consumer 에게 맞추어져있음을 알 수 있습니다. 제일 먼저 Supplier 에게 자원을 제공할 대상 Consumer 를 알려주는 assign() 함수가 있고, 둘째로 Consumer 가 건네주는 프로그램 소스코드를 컴파일하여 Worker 프로그램을 생성-가동하는 compile() 함수가 있습니다.

그리고 마지막으로, provider 가 있습니다. 이것은, Supplier 가 Consumer 가 건네준 코드를 컴파일하여 생성한, Worker 프로그램에서 제공하는 Provider 를 사용할 수 있게 해 주는 변수입니다. Supplier 의 메인 프로그램이나 Consumer 프로그램의 기준에서는 Driver<Controller> 에 해당합니다.

  • WorkerServer<Provider>.getProvider()
  • WorkerConnector.getDriver<Controller>()
export namespace Supplier
{
    export interface IController
    {
        /**
         * 컴파일된 Worker 프로그램이 제공해주는 Provider.
         * 
         * *Supplier* 는 *Consumer* 가 제공해준 소스코드를 컴파일 ({@link compile}) 하여 
         * Worker 프로그램을 가동시킵니다. 객체 `provider` 는 바로 해당 Worker 프로그램이 제공하는
         * Provider (Supplier 메인 프로그램 기준으로는 Driver<Controller>) 를 의미합니다.
         * 
         *   - {@link WorkerServer.getProvider}
         *   - {@link WorkerConnector.getDriver}
         * 
         * @warning 반드시 {@link compile} 을 완료한 후에 사용할 것
         */
        provider: object;

        /**
         * 자원을 제공받을 *Consumer* 가 배정됨
         */
        assign(consumer_uid: number): void;

        /**
         * 소스코드를 컴파일하여 Worker 를 구동함
         * 
         * @param script 컴파일하여 구동할 Worker 프로그램의 소스코드
         * @param args 메인 함수 arguments
         */
        compile(script: string, ...args: string[]): void;

        /**
         * 구동 중인 Worker 를 종료함
         */
        close(): void;
    }
}

2.2.4. Monitor

Monitor 는 Market 에게 Provider 를 하나 제공합니다. 이 Provider 가 설계된 목적은 단 하나로써, 이를 단 한 마디로 정의하자면 "Market 아, 너에게서 일어나는 모든 일을 나에게 알려줘" 입니다. 따라서 해당 Provider 에 대한 인터페이스 격인 Controller 에 정의된 함수 역시 모두, Market 에서 일어나는 일을 Monitor 에게 알려주기 위한 것들입니다.

Monitor 는 Market 에서 이루어지는, Consumer 와 Supplier 간의, 전체 거래를 들여다 볼 수 있습니다. 즉, Consumer 가 각 Supplier 의 자원을 구입할 때마다, Market 은 Monitor 에게 해당 거래에 대하여 알려줍니다; transact(). 또한, Consumer 가 모든 연산 작업을 마치고 자신이 구입했던 Supplier 들의 자원을 반환하는 순간 역시, Market 은 Monitor 에게 이를 알려줍니다; release().

더불어 Monitor 는 현재 Market 에 참여하고 있는 Consumer 와 Supplier 의 전체 리스트를 알 수 있습니다. Monitor 가 처음 Market 서버에 접속하거든, Market 은 assign() 을 호출하여 전체 참여자 리스트를 Monitor 에게 전달합니다. 그리고 이후에 새로운 참여자가 들어오거나 나가거나 할 때마다, Market 은 관련 메소드 (insertConsumer()eraseSupplier() 등) 를 호출하여, 이 사실을 Monitor 에게 전달하게 됩니다.

export namespace Monitor
{
    export interface IController
    {
        /**
         * 시장 참여자 전체의 리스트를 할당
         * 
         * @param consumers 시장에 참여중인 *Consumer* 의 노드 리스트
         * @param suppliers 시장에 참여중인 *Supplier* 의 노드 리스트
         */
        assign(consumers: IConsumerNode[], suppliers: ISupplierNode[]): void;

        /**
         * *Conumser* 가 *Supplier* 의 자원을 구매하는 거래가 이루어짐
         * 
         * @param consumer 해당 *Consumer* 의 식별자 번호
         * @param supplier 해당 *Supplier* 의 식별자 번호
         */
        transact(consumer: number, supplier: number): void;

        /**
         * *Consumer* 가 모든 작업을 끝내고 구매하였던 자원을 반환함
         * 
         * @param consumer_uid 해당 *Consumer* 의 식별자 번호
         */
        release(consumer_uid: number): void;

        //----
        // INDIVIDUAL I/O
        //----
        /**
         * 신규 *Consumer* 의 입장
         * 
         * @param consumer *Consumer* 노드 정보
         */
        insertConsumer(consumer :IConsumerNode): void;

        /**
         * 신규 *Supplier* 의 입장
         *
         * @param supplier *Supplier* 노드 정보
         */
        insertSupplier(supplier: ISupplierNode): void;

        /**
         * 기존 *Consumer* 의 퇴장
         * 
         * @param uid 해당 *Consumer* 의 식별자 번호
         */
        eraseConsumer(uid: number): void;

        /**
         * 기존 *Supplier* 의 퇴장
         * 
         * @param uid 해당 *Supplier* 의 식별자 번호
         */
        eraseSupplier(uid: number): void;
    }
}

2.3. Class Diagram

Class Diagram

3. Core Implementation

3.1. Market

Market 은 Consumer 와 Supplier 간의 컴퓨팅 자원 거래가 이루어지는 중개시장입니다.

따라서 Market 클래스의 구현 코드는 제일 먼저 웹소켓 서버를 개설하는 것에서부터 시작합니다. 그리고 Market 서버에 클라이언트가 접속할 때마다, 해당 클라이언트가 접속에 사용한 주소를 기준으로 그 역할을 식별하고 전담 클래스를 생성하여 지원하게 됩니다.

Path Role Generated Class
/consumer Consumer ConsummerChannel
/supplier Supplier SupplierChannel
/monitor Monitor Driver

core/market/Market.ts

import { HashMap } from "tstl/container/HashMap";
import { WebServer } from "tgrid/protocols/web/WebServer";
import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { Driver } from "tgrid/components/Driver";

import { ConsumerChannel } from "./ConsumerChannel";
import { SupplierChannel } from "./SupplierChannel";
import { Monitor } from "../monitor/Monitor";

export class Market
{
    private server_: WebServer<Provider>;

    private consumers_: HashMap<number, ConsumerChannel>;
    private suppliers_: HashMap<number, SupplierChannel>;
    private monitors_: HashMap<number, Driver<Monitor.IController>>;

    private static sequence_: number = 0;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * Default Constructor.
     */
    public constructor()
    {
        this.server_ = new WebServer<Provider>();

        this.consumers_ = new HashMap();
        this.suppliers_ = new HashMap();
        this.monitors_ = new HashMap();
    }

    public open(port: number): Promise<void>
    {
        return this.server_.open(port, async (acceptor: WebAcceptor<Provider>) =>
        {
            let uid: number = ++Market.sequence_;
            if (acceptor.path === "/monitor")
            {
                await this._Handle_monitor(uid, acceptor);
                return;
            }

            //----
            // PRELIMINARIES
            //----
            // DETERMINE ACTOR
            let instance: Instance;
            let dictionary: HashMap<number, Instance>;

            // MONITOR HANDLER
            let monitor_inserter: (drvier: Driver<Monitor.IController>) => Promise<void>;
            let monitor_eraser: (drvier: Driver<Monitor.IController>) => Promise<void>;

            // PARSE PATH
            if (acceptor.path === "/consumer")
            {
                instance = await ConsumerChannel.create(uid, this, acceptor as WebAcceptor<ConsumerChannel.Provider>);
                dictionary = this.consumers_;

                monitor_inserter = driver => driver.insertConsumer((instance as ConsumerChannel).toNode());
                monitor_eraser = driver => driver.eraseConsumer(uid);
            }
            else if (acceptor.path === "/supplier")
            {
                instance = await SupplierChannel.create(uid, acceptor as WebAcceptor<SupplierChannel.Provider>);
                dictionary = this.suppliers_;

                monitor_inserter = driver => driver.insertSupplier((instance as SupplierChannel).toNode());
                monitor_eraser = driver => driver.eraseSupplier(uid);
            }
            else
            {
                acceptor.reject(404, "Invalid URL");
                return;
            }

            //----
            // PROCEDURES
            //----
            // ENROLL TO DICTIONARY
            dictionary.emplace(uid, instance);
            console.log("A participant has come", this.consumers_.size(), this.suppliers_.size());

            // INFORM TO MONITORS
            for (let entry of this.monitors_)
                monitor_inserter(entry.second).catch(() => {});

            //----
            // DISCONNECTION
            //----
            // JOIN CONNECTION
            try 
            { 
                await acceptor.join(); 
            } 
            catch {}

            // ERASE ON DICTIONARY
            dictionary.erase(uid);
            console.log("A participant has left", this.consumers_.size(), this.suppliers_.size());

            // INFORM TO MONITORS
            for (let entry of this.monitors_)
                monitor_eraser(entry.second).catch(() => {});
        });
    }

    public async close(): Promise<void>
    {
        await this.server_.close();

        this.consumers_.clear();
        this.suppliers_.clear();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public getSuppliers(): HashMap<number, SupplierChannel>
    {
        return this.suppliers_;
    }

    public getMonitors(): HashMap<number, Driver<Monitor.IController>>
    {
        return this.monitors_;
    }

    /* ----------------------------------------------------------------
        MONITOR HANDLER
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private async _Handle_monitor(uid: number, acceptor: WebAcceptor<{}>): Promise<void>
    {
        console.log("A monitor has come", this.monitors_.size());

        // ACCEPT CONNECTION
        let driver: Driver<Monitor.IController> = acceptor.getDriver<Monitor.IController>();
        await acceptor.accept(null);

        this.monitors_.emplace(uid, driver);

        //----
        // SEND CURRENT RELATIONSHIP
        //----
        // DO ASSIGN
        await driver.assign
        (
            [...this.consumers_].map(it => it.second.toNode()), 
            [...this.suppliers_].map(it => it.second.toNode())
        );

        //----
        // JOIN CONNECTION
        //----
        await acceptor.join();
        this.monitors_.erase(uid);

        console.log("A monitor has left", this.monitors_.size());
    }
}

type Provider = ConsumerChannel.Provider | SupplierChannel.Provider;
type Instance = ConsumerChannel | SupplierChannel;

ConsumerChannel 은 Market 서버에 접속한 클라이언트 Consumer 에 대응하기 위한 클래스입니다.

Market 서버 프로그램은 이 ConsumerChannel 클래스를 통하여 Consumer 가 구입한 Supplier 들의 자원 리스트를 기록하고 관리합니다. 그리고 Consumer 는 이 ConsumerChannel 클래스의 내부 네임스페이스에 정의된 ConsumerChannel.Provider 를 통하여, Market 서버에 접속해있는 전체 Supplier 들의 리스트를 열람하고, 그들의 자원을 구매하고 사용할 수 있습니다.

core/market/ConsumerChannel.ts

import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { HashMap } from "tstl/container/HashMap";
import { ArrayDict } from "../../utils/ArrayDict";
import { DateUtil } from "../../utils/DateUtil";

import { IConsumerNode } from "../monitor/IConsumerNode";
import { ISupplier } from "../supplier/ISupplier";
import { SupplierChannel } from "./SupplierChannel";

import { Market } from "./Market";
import { Supplier } from "../supplier/Supplier";
import { Consumer } from "../consumer/Consumer";

export class ConsumerChannel
{
    public readonly uid: number;
    public readonly created_at: Date;

    /**
     * @hidden
     */
    private market_: Market;

    /**
     * @hidden
     */
    private acceptor_: WebAcceptor<ConsumerChannel.Provider>;

    /**
     * @hidden
     */
    private servants_: HashMap<number, SupplierChannel>;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, market: Market, acceptor: WebAcceptor<ConsumerChannel.Provider>)
    {
        this.uid = uid;
        this.market_ = market;
        this.acceptor_ = acceptor;

        this.created_at = new Date();
        this.servants_ = new HashMap();
    }

    /**
     * @internal
     */
    public static async create(uid: number, market: Market, acceptor: WebAcceptor<ConsumerChannel.Provider>): Promise<ConsumerChannel>
    {
        let ret: ConsumerChannel = new ConsumerChannel(uid, market, acceptor);
        await ret.acceptor_.accept(new ConsumerChannel.Provider(ret));

        ret._Handle_disconnection();
        return ret;
    }

    /**
     * @hidden
     */
    private async _Handle_disconnection(): Promise<void>
    {
        try { await this.acceptor_.join(); } catch {}
        for (let it of this.servants_)
            await it.second.release(this);
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public getMarket(): Market
    {
        return this.market_;
    }

    public getDriver()
    {
        return this.acceptor_.getDriver<Consumer.IController>();
    }

    /**
     * @internal
     */
    public getAssignees()
    {
        return this.servants_;
    }

    public toNode(): IConsumerNode
    {
        return {
            uid: this.uid,
            created_at: DateUtil.to_string(this.created_at, true),
            servants: [...this.servants_].map(it => it.first)
        };
    }

    /* ----------------------------------------------------------------
        SUPPLIERS I/O
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public async transact(supplier: SupplierChannel): Promise<boolean>
    {
        if (this.servants_.has(supplier.uid) || // DUPLICATED
            await supplier.transact(this) === false) // MONOPOLIZED
            return false;

        // CONSTR5UCT SERVANT
        await this.servants_.emplace(supplier.uid, supplier);

        // PROVIDER FOR CONSUMER (SERVANT) := CONTROLLER OF SUPPLIER
        let provider = supplier.getDriver();
        this.acceptor_.getProvider()!.assginees.set(supplier.uid, provider);

        // RETURN WITH ASSIGNMENT
        await provider.assign(this.uid);
        for (let entry of this.market_.getMonitors())
            entry.second.transact(this.uid, supplier.uid).catch(() => {});

        return true;
    }

    /**
     * @internal
     */
    public async release(supplier: SupplierChannel): Promise<void>
    {
        this.servants_.erase(supplier.uid);
        this.acceptor_.getProvider()!.assginees.erase(supplier.uid);

        await supplier.release(this);
        for (let entry of this.market_.getMonitors())
            entry.second.release(supplier.uid).catch(() => {});
    }
}

export namespace ConsumerChannel
{
    /**
     * @hidden
     */
    export interface IController
    {
        assginees: ArrayLike<Supplier.IController>;

        getUID(): number;
        getSuppliers(): ISupplier[];
        buyResource(supplier: ISupplier): Promise<boolean>;
    }

    /**
     * @hidden
     */
    export class Provider implements IController
    {
        private consumer_: ConsumerChannel;
        private market_: Market;
        public assginees: ArrayDict<Supplier.IController>;

        public constructor(consumer: ConsumerChannel)
        {
            this.consumer_ = consumer;
            this.market_ = consumer.getMarket();
            this.assginees = new ArrayDict();
        }

        public getUID(): number
        {
            return this.consumer_.uid;
        }

        public getSuppliers(): ISupplier[]
        {
            return this.market_.getSuppliers().toJSON().map(entry => entry.second.toJSON());
        }

        public async buyResource(supplier: ISupplier): Promise<boolean>
        {
            let map = this.market_.getSuppliers();
            let it = map.find(supplier.uid);

            if (it.equals(map.end()) === true)
                return false;

            return await this.consumer_.transact(it.second);
        }
    }
}

SupplierChannel 은 Market 서버에 접속한 클라이언트 Supplier 에 대응하기 위한 클래스입니다.

Market 서버 프로그램은 이 SupplierChannel 클래스를 통하여, 해당 Supplier 의 performance 정보를 기록하고 관리하며, 마찬가지로 해당 Supplier 의 자원을 구입한 Consumer 정보 역시 이 SupplierChannel 클래스에 기록됩니다.

Supplier 는 이 SupplierChannel 클래스의 내부 네임스페이스에 정의된 SupplierChannel.Provider 를 통하여, Consumer 가 자신에게 할당해 준 Provider 의 함수들을 Driver<Controller> 를 통하여 원격 호출할 수 있습니다.

core/market/SupplierChannel.ts

import { WebAcceptor } from "tgrid/protocols/web/WebAcceptor";
import { Driver } from "tgrid/components/Driver";
import { Mutex } from "tstl/thread/Mutex";
import { UniqueLock } from "tstl/thread/UniqueLock";
import { DateUtil } from "../../utils/DateUtil";

import { ISupplier } from "../supplier/ISupplier";
import { ISupplierNode } from "../monitor/ISupplierNode";
import { Supplier } from "../supplier/Supplier";

import { ConsumerChannel } from "./ConsumerChannel";

export class SupplierChannel implements Readonly<ISupplier>
{
    public readonly uid: number;
    public readonly created_at: Date;
    public readonly performance: ISupplier.IPerformance;

    private acceptor_: WebAcceptor<SupplierChannel.Provider>;
    private mutex_: Mutex;
    private consumer_: ConsumerChannel | null;
    private assigned_at_: Date | null;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, acceptor: WebAcceptor<SupplierChannel.Provider>)
    {
        this.uid = uid;
        this.created_at = new Date();

        this.acceptor_ = acceptor;
        this.performance = 
        {
            mean: 1.0,
            risk: 0.0,
            credit: 0.0
        };

        this.mutex_ = new Mutex();
        this.consumer_ = null;
        this.assigned_at_ = null;
    }

    /**
     * @internal
     */
    public static async create(uid: number, acceptor: WebAcceptor<SupplierChannel.Provider>): Promise<SupplierChannel>
    {
        let ret: SupplierChannel = new SupplierChannel(uid, acceptor);
        await ret.acceptor_.accept(new SupplierChannel.Provider(ret));

        ret._Handle_disconnection();
        return ret;
    }

    /**
     * @hidden
     */
    private async _Handle_disconnection(): Promise<void>
    {
        try { await this.acceptor_.join(); } catch {}
        await UniqueLock.lock(this.mutex_, async () =>
        {
            if (this.consumer_ !== null)
                this.consumer_.release(this);
        });
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    /**
     * @inheritDoc
     */
    public get free(): boolean
    {
        return this.consumer_ === null;
    }

    public getDriver(): Driver<Supplier.IController>
    {
        return this.acceptor_.getDriver<Supplier.IController>();
    }

    public getConsumer(): ConsumerChannel | null
    {
        return this.consumer_;
    }

    public toJSON(): ISupplier
    {
        let ret: ISupplier = 
        {
            uid: this.uid,
            performance: this.performance,
            free: this.free
        };
        return ret;
    }

    public toNode(): ISupplierNode
    {
        return {
            uid: this.uid,
            created_at: DateUtil.to_string(this.created_at, true),
            assigned_at: this.assigned_at_
                ? DateUtil.to_string(this.assigned_at_, true)
                : null
        };
    }

    /* ----------------------------------------------------------------
        ASSIGNER
    ---------------------------------------------------------------- */
    /**
     * @internal
     */
    public async transact(consumer: ConsumerChannel): Promise<boolean>
    {
        let ret: boolean;
        await UniqueLock.lock(this.mutex_, () =>
        {
            if ((ret = this.free) === true)
            {
                this.consumer_ = consumer;
                this.assigned_at_ = new Date();
                this.acceptor_.getProvider()!.provider = consumer.getDriver().servants[this.uid];
            }
        });
        return ret!;
    }

    /**
     * @internal
     */
    public async release(consumer: ConsumerChannel): Promise<void>
    {
        await UniqueLock.lock(this.mutex_, async () =>
        {
            if (this.consumer_ === consumer)
            {
                // ERASE CONSUMER
                this.consumer_ = null;
                this.assigned_at_ = null;
                this.acceptor_.getProvider()!.provider = null;

                // TO ANTICIPATE ABUSING
                this.getDriver().close().catch(() => {});
            }
        });
    }
}

export namespace SupplierChannel
{
    export interface IController
    {
        provider: object | null;
        getUID(): number;
    }

    export class Provider implements IController
    {
        private channel_: SupplierChannel;

        // PROVIDER FOR SUPPLIER := CONTROLLER OF CONSUMER (SERVANT)
        public provider: object | null = null;

        public constructor(channel: SupplierChannel)
        {
            this.channel_ = channel;
        }

        public getUID(): number
        {
            return this.channel_.uid;
        }
    }
}

3.2. Consumer

Consumer 클래스는 Consumer 를 위하여 제작된 Facade 클래스입니다.

Consumer 는 Consumer.participate() 메서드를 이용하여 Market 서버에 접속함으로써, 시장에 참여할 수 있습니다. 그리고 Consumer.getSuppliers() 메서드를 이용하여 시장에 참여중인 전체 Supplier 들의 리스트를 조회할 수 있고, 이들 중 원하는 Supplier 들의 자원을 Consumer.buyResource() 메서드를 이용하여 구입할 수 있습니다.

core/consumer/Consumer.ts

import { WebConnector } from "tgrid/protocols/web/WebConnector";
import { Driver } from "tgrid/components/Driver";
import { ArrayDict } from "../../utils/ArrayDict";

import { Servant } from "./Servant";
import { ISupplier } from "../supplier/ISupplier";
import { ConsumerChannel } from "../market/ConsumerChannel";

export class Consumer
{
    public readonly uid: number;
    private connector_: WebConnector<Consumer.Provider>;

    private market_: Driver<ConsumerChannel.IController>;
    private servants_: Servant[];

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    private constructor(uid: number, connector: WebConnector<Consumer.Provider>)
    {
        this.uid = uid;
        this.connector_ = connector;

        this.market_ = connector.getDriver<ConsumerChannel.IController>();
        this.servants_ = [];
    }

    public static async participate(url: string): Promise<Consumer>
    {
        let provider: Consumer.Provider = new Consumer.Provider();
        let connector: WebConnector<Consumer.Provider> = new WebConnector(provider);
        await connector.connect(url);

        let uid: number = await connector.getDriver<ConsumerChannel.IController>().getUID();
        return new Consumer(uid, connector);
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public getSuppliers(): Promise<ISupplier[]>
    {
        return this.market_.getSuppliers();
    }

    public async buyResource(base: ISupplier): Promise<Servant | null>
    {
        if (await this.market_.buyResource(base) === false)
            return null;

        let driver = this.market_.assginees[base.uid];
        let ret: Servant = Servant.create(base, driver);

        ret.join().then(() =>
        {
            for (let i: number = 0; i < this.servants_.length; ++i)
                if (this.servants_[i].uid === ret.uid)
                {
                    this.servants_.splice(i, 1);
                    break;
                }
            this.connector_.getProvider()!.servants.erase(ret.uid);
        });

        this.servants_.push(ret);
        this.connector_.getProvider()!.servants.set(ret.uid, new Servant.Provider(ret));

        return ret;
    }
}

export namespace Consumer
{
    /**
     * @internal
     */
    export interface IController
    {
        servants: ArrayLike<Servant.IController>;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        public servants: ArrayDict<Servant.Provider> = new ArrayDict();
    }
}

Consumer.buyResource() 를 통해 구입한 Supplier 의 자원은 Servant 클래스를 통하여 관리됩니다. 이 Servant 클래스의 역할은 Consumer 와 Supplier 의 Worker 프로그램을 잇는 Communicator 클래스입니다. 비록 Consumer 와 Supplier 의 Worker 프로그램 사이에는 Market 과 Supplier 의 메인 프로그램이 중간 매개체로써 자리하고 있더라도 말입니다.

Consumer 는 Servant.compile() 메서드를 통해 Supplier 에게 제공할 Provider 와, 그것이 실행해야 할 프로그램 소스코드를 건네줄 수 있습니다. 대상 Supplier 는 해당 프로그램 소스코드를 컴파일하고, 이를 새 Worker 프로그램에 탑재하여 구동하게 됩니다. 그리고 그 Worker 프로그램이 바로, 현 Consumer 프로그램과 연동하게 될 최종 인스턴스입니다.

core/consumer/Servant.ts

import { Driver } from "tgrid/components/Driver";
import { ConditionVariable } from "tstl/thread/ConditionVariable";

import { ISupplier } from "../supplier/ISupplier";
import { Supplier } from "../supplier/Supplier";

export class Servant implements Readonly<ISupplier>
{
    /**
     * @hidden
     */
    private base_: ISupplier;

    /**
     * @hidden
     */
    private assignee_: Driver<Supplier.IController>;

    /**
     * @hidden
     */
    private joiners_: ConditionVariable;

    /**
     * @hidden
     */
    private provider_?: object | null;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(base: ISupplier, driver: Driver<Supplier.IController>)
    {
        this.base_ = base;
        this.assignee_ = driver;

        this.joiners_ = new ConditionVariable();
    }

    /**
     * @internal
     */
    public static create(base: ISupplier, driver: Driver<Supplier.IController>): Servant
    {
        return new Servant(base, driver);
    }

    public async compile(provider: object | null, script: string, ...args: string[]): Promise<void>
    {
        this.provider_ = provider;
        await this.assignee_.compile(script, ...args);
    }

    public async close(): Promise<void>
    {
        await this.assignee_.close();
        await this.joiners_.notify_all();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public get provider(): object | null | undefined
    {
        return this.provider_;
    }

    public getDriver<Controller extends object>(): Driver<Controller>
    {
        return this.assignee_.provider as Driver<Controller>;
    }

    public join(): Promise<void>;
    public join(ms: number): Promise<boolean>;
    public join(until: Date): Promise<boolean>;

    public join(param?: number | Date): Promise<void | boolean>
    {
        if (param === undefined)
            return this.joiners_.wait();
        else if (param instanceof Date)
            return this.joiners_.wait_until(param);
        else
            return this.joiners_.wait_for(param);
    }

    /* ----------------------------------------------------------------
        PROPERTIES
    ---------------------------------------------------------------- */
    public get uid(): number
    {
        return this.base_.uid;
    }
    public get performance(): ISupplier.IPerformance
    {
        return this.base_.performance;
    }
    public get free(): boolean
    {
        return false;
    }
}

export namespace Servant
{
    /**
     * @internal
     */
    export interface IController
    {
        provider: object;

        join(): Promise<void>;
        close(): Promise<void>;
    }

    /**
     * @internal
     */
    export class Provider
    {
        private base_: Servant;

        public constructor(base: Servant)
        {
            this.base_ = base;
        }

        public get provider(): object
        {
            return this.base_.provider!;
        }

        public join(): Promise<void>
        {
            return this.base_.join();
        }

        public close(): Promise<void>
        {
            return this.base_.close();
        }
    }
}

3.3. Supplier

Supplier 클래스는 Supplier 를 위해 만들어진 Facade Controller 입니다.

Supplier 는 Supplier.participate() 메서드를 호출하여 Market 서버에 접속함으로써, 시장에 참여할 수 있습니다. 그리고 Supplier 클래스의 내부 네임스페이스에 정의된 Supplier.Provider 를 이용하여, Market 과 Consumer 가 필요로 하는 기능들을 제공하고 있습니다.

core/supplier/Supplier.ts

import { EventEmitter } from "events";

import { WebConnector } from "tgrid/protocols/web/WebConnector";
import { WorkerConnector } from "tgrid/protocols/workers/WorkerConnector";
import { Driver } from "tgrid/components/Driver";

import { IPointer } from "tstl/functional/IPointer";
import { SupplierChannel } from "../market/SupplierChannel";

export class Supplier extends EventEmitter
{
    public readonly uid: number;

    /**
     * @hidden
     */
    private connector_: WebConnector<Supplier.Provider>;

    /* ----------------------------------------------------------------
        CONSTRUCTOR
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(uid: number, connector: WebConnector<Supplier.Provider>)
    {
        super();

        this.uid = uid;
        this.connector_ = connector;
    }

    public static async participate(url: string): Promise<Supplier>
    {
        // POINTERS - LAZY CONSTRUCTION
        let basePtr: IPointer<Supplier> = { value: null! };
        let workerPtr: IPointer<WorkerConnector> = { value: null! };

        // PREPARE ASSETS
        let provider = new Supplier.Provider(basePtr, workerPtr);
        let connector: WebConnector<Supplier.Provider> = new WebConnector(provider);
        let driver: Driver<SupplierChannel.IController> = connector.getDriver<SupplierChannel.IController>();

        // CONSTRUCT WORKER
        let worker: WorkerConnector = new WorkerConnector(driver.provider);
        workerPtr.value = worker;

        // CONNECTION & CONSTRUCTION
        await connector.connect(url);
        let ret: Supplier = new Supplier(await driver.getUID(), connector);
        basePtr.value = ret;

        // RETURNS
        return ret;
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }
}

export namespace Supplier
{
    /**
     * @internal
     */
    export interface IController
    {
        provider: object;

        assign(consumerUID: number): void;
        compile(script: string, ...args: string[]): Promise<void>;
        close(): Promise<void>;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        private base_ptr_: IPointer<Supplier>;
        private worker_ptr_: IPointer<WorkerConnector>;
        private consumer_uid_?: number;

        /* ----------------------------------------------------------------
            CONSTRUCTOR
        ---------------------------------------------------------------- */
        public constructor(basePtr: IPointer<Supplier>, workerPtr: IPointer<WorkerConnector>)
        {
            this.base_ptr_ = basePtr;
            this.worker_ptr_ = workerPtr;
        }

        public assign(consumerUID: number): void
        {
            this.consumer_uid_ = consumerUID;
            this.base_ptr_.value.emit("assign", consumerUID);
        }

        public async compile(code: string, ...args: string[]): Promise<void>
        {
            // FOR SAFETY
            let state = this.worker_ptr_.value.state;
            if (state !== WorkerConnector.State.NONE && state !== WorkerConnector.State.CLOSED)
                await this.worker_ptr_.value.close();

            // DO COMPILE
            await this.worker_ptr_.value.compile(code, ...args);

            // EMIT EVENTS
            this.base_ptr_.value.emit("compile", this.consumer_uid_, code, ...args);
            this.worker_ptr_.value.join().then(() =>
            {
                this.base_ptr_.value.emit("close", this.consumer_uid_);
            });
        }

        public close(): Promise<void>
        {
            return this.worker_ptr_.value.close();
        }

        /* ----------------------------------------------------------------
            ACCESSORS
        ---------------------------------------------------------------- */
        public get provider(): Driver<object>
        {
            return this.worker_ptr_.value.getDriver<object>();
        }

        public isFree(): boolean
        {
            return this.worker_ptr_.value.state === WorkerConnector.State.NONE
                || this.worker_ptr_.value.state === WorkerConnector.State.CLOSED;
        }
    }
}

또한, Supplier 의 식별자 및 performance 에 대한 정보는, 아래 ISupplier 구조체로 요약될 수 있습니다. Consumer 는 이 ISupplier 에 기재된 Supplier 의 요약정보를 보고, 해당 Supplier 의 자원 구매 여부를 결정하게 됩니다.

core/supplier/ISupplier.ts

export interface ISupplier
{
    uid: number;
    performance: ISupplier.IPerformance;
    free: boolean;
}
export namespace ISupplier
{
    export interface IPerformance
    {
        mean: number;
        risk: number;
        credit: number;
    }
}

3.4. Monitor

Monitor 클래스는 Monitor 를 위해 만들어진 Facade Controller 입니다.

Monitor 는 Monitor.participate() 메서드를 호출하여 Market 서버에 접속합니다. 그리고 Monitor 클래스의 내부 네임스페이스에 정의된 Monitor.Provider 클래스와 Monitor 클래스의 다양한 accessor 메서드들을 이용하여, 시장에서 발생하는 모든 거래내역을 실시간으로 들여다볼 수 있습니다.

반대로 얘기하면, Market 은 시장에서 참여자 리스트에 변동이 생기거나 새로운 거래내역이 발생할 때마다, Driver 의 함수들을 원격 호출하여 이를 Monitor 에게 알려줍니다.

`core/monitor/Monitor.ts

import { WebConnector } from "tgrid/protocols/web";

import { EventEmitter } from "events";
import { HashMap } from "tstl/container/HashMap";
import { ConditionVariable } from "tstl/thread/ConditionVariable";
import { IPointer } from "tstl/functional/IPointer";

import { ConsumerNode } from "./ConsumerNode";
import { SupplierNode } from "./SupplierNode";
import { IConsumerNode } from "./IConsumerNode";
import { ISupplierNode } from "./ISupplierNode";

export class Monitor
{
    /**
     * @hidden
     */
    private connector_: WebConnector;

    /**
     * @hidden
     */
    private consumers_: HashMap<number, ConsumerNode>;

    /**
     * @hidden
     */
    private suppliers_: HashMap<number, SupplierNode>;

    /**
     * @hidden
     */
    private emitter_: EventEmitter;

    /* ----------------------------------------------------------------
        CONSTRUCTORS
    ---------------------------------------------------------------- */
    /**
     * @hidden
     */
    private constructor(connector: WebConnector)
    {
        this.connector_ = connector;

        this.consumers_ = new HashMap();
        this.suppliers_ = new HashMap();
        this.emitter_ = new EventEmitter();
    }

    /**
     * @internal
     */
    public static async participate(url: string): Promise<Monitor>
    {
        // PREPARE ASSETS
        let ptr: IPointer<Monitor> = { value: null! };
        let waitor: ConditionVariable = new ConditionVariable();

        let provider: Monitor.Provider = new Monitor.Provider(ptr, waitor);
        let connector: WebConnector = new WebConnector(provider);

        // LAZY CREATION
        ptr.value = new Monitor(connector);

        // CONNECT & WAIT MARKET
        await connector.connect(url);
        await waitor.wait();

        // RETURNS
        return ptr.value;
    }

    public leave(): Promise<void>
    {
        return this.connector_.close();
    }

    /* ----------------------------------------------------------------
        ACCESSORS
    ---------------------------------------------------------------- */
    public on(type: "refresh", listener: (consumers: HashMap<number, ConsumerNode>, suppliers: HashMap<number, SupplierNode>) => void): void
    {
        this.emitter_.on(type, listener);
    }

    public getConsumers(): HashMap<number, ConsumerNode>
    {
        return this.consumers_;
    }

    public getSuppliers(): HashMap<number, SupplierNode>
    {
        return this.suppliers_;
    }

    /**
     * @internal
     */
    public _Refresh(): void
    {
        this.emitter_.emit("refresh", this.consumers_, this.suppliers_);
    }
}

export namespace Monitor
{
    /**
     * @internal
     */
    export interface IController
    {
        assign(consumers: IConsumerNode[], suppliers: ISupplierNode[]): Promise<void>;

        insertConsumer(consumer :IConsumerNode): void;
        insertSupplier(supplier: ISupplierNode): void;
        eraseConsumer(uid: number): void;
        eraseSupplier(uid: number): void;

        transact(consumer: number, supplier: number): void;
        release(uid: number): void;
    }

    /**
     * @internal
     */
    export class Provider implements IController
    {
        private ptr_: IPointer<Monitor>;
        private waitor_: ConditionVariable;

        /* ----------------------------------------------------------------
            CONSTRUCTORS
        ---------------------------------------------------------------- */
        public constructor(ptr: IPointer<Monitor>, waitor: ConditionVariable)
        {
            this.ptr_ = ptr;
            this.waitor_ = waitor;
        }

        public async assign(rawConsumers: IConsumerNode[], rawSuppliers: ISupplierNode[]): Promise<void>
        {
            let base: Monitor = this.ptr_.value;
            for (let raw of rawSuppliers)
                base.getSuppliers().emplace(raw.uid, new SupplierNode(raw));

            for (let raw of rawConsumers)
            {
                let consumer: ConsumerNode = new ConsumerNode(raw);
                for (let uid of raw.servants)
                {
                    let supplier: SupplierNode = base.getSuppliers().get(uid);
                    consumer.servants.emplace(uid, supplier);
                }
                base.getConsumers().emplace(raw.uid, consumer);
            }
            await this.waitor_.notify_all();
        }

        /* ----------------------------------------------------------------
            ELEMENTS I/O
        ---------------------------------------------------------------- */
        public insertConsumer(raw: IConsumerNode): void
        {
            let base: Monitor = this.ptr_.value;
            let consumer: ConsumerNode = new ConsumerNode(raw);

            base.getConsumers().emplace(consumer.uid, consumer);
            base._Refresh();
        }

        public insertSupplier(raw: ISupplierNode): void
        {
            let base: Monitor = this.ptr_.value;
            base.getSuppliers().emplace(raw.uid, new SupplierNode(raw));
            base._Refresh();
        }

        public eraseConsumer(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let consumer: ConsumerNode = base.getConsumers().get(uid); 
            for (let entry of consumer.servants)
                entry.second.release();

            base.getConsumers().erase(uid);
            base._Refresh();
        }

        public eraseSupplier(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let supplier: SupplierNode = base.getSuppliers().get(uid);
            if (supplier.assignee !== null)
                supplier.assignee.servants.erase(uid);

            supplier.release();
            base.getSuppliers().erase(uid);

            base._Refresh();
        }

        /* ----------------------------------------------------------------
            RELATIONSHIPS
        ---------------------------------------------------------------- */
        public transact(customerUID: number, supplierUID: number): void
        {
            let base: Monitor = this.ptr_.value;
            let consumer: ConsumerNode = base.getConsumers().get(customerUID);
            let supplier: SupplierNode = base.getSuppliers().get(supplierUID);

            consumer.servants.emplace(supplier.uid, supplier);
            supplier.assign(consumer);

            base._Refresh();
        }

        public release(uid: number): void
        {
            let base: Monitor = this.ptr_.value;

            let supplier: SupplierNode = base.getSuppliers().get(uid);
            if (supplier.assignee !== null)
                supplier.assignee.servants.erase(uid);

            supplier.release();
            base._Refresh();
        }
    }
}

ConsumerNode 클래스는 Market 에 참여한 Consumer 를 표현하기 위해 제작된 클래스로써, 해당 Consumer 구매한 Supplier 의 자원 내역을 기록하고 있습니다.

core/monitor/ConsumerNode.ts

import { HashMap } from "tstl/container/HashMap";
import { SupplierNode } from "./SupplierNode";
import { IConsumerNode } from "./IConsumerNode";

export class ConsumerNode
{
    public readonly uid: number;
    public readonly servants: HashMap<number, SupplierNode>;
    public readonly created_at: Date;

    public constructor(raw: IConsumerNode)
    {
        this.uid = raw.uid;
        this.servants = new HashMap();
        this.created_at = new Date(raw.created_at);
    }
}

SupplierNode 클래스는 Market 에 참여한 Supplier 를 표현하기 위해 설계된 클래스로써, 해당 Supplier 의 자원을 구매하여 사용하고 있는 Consumer 에 대한 정보 또한 기록하고 있습니다.

core/monitor/SupplierNode.ts

import { ConsumerNode } from "./ConsumerNode";
import { ISupplierNode } from "./ISupplierNode";

export class SupplierNode
{
    public readonly uid: number;
    public readonly created_at: Date;

    /**
     * @hidden
     */
    private assignee_: ConsumerNode | null;

    /**
     * @hidden
     */
    private assigned_at_: Date | null;

    public constructor(raw: ISupplierNode)
    {
        this.uid = raw.uid;
        this.created_at = new Date(raw.created_at);

        this.assignee_ = null;
        this.assigned_at_ = raw.assigned_at
            ? new Date(raw.assigned_at)
            : null;
    }

    public get assignee(): ConsumerNode | null
    {
        return this.assignee_;
    }

    public get assigned_at(): Date | null
    {
        return this.assigned_at_;
    }

    /**
     * @internal
     */
    public assign(obj: ConsumerNode): void
    {
        this.assignee_ = obj;
        this.assigned_at_ = new Date();
    }

    /**
     * @internal
     */
    public release(): void
    {
        this.assignee_ = null;
        this.assigned_at_ = null;
    }
}

results matching ""

    No results matching ""