Data Caching, Storage and Retrieval

Phase 3 - Orders

Overview

  • The DriveWealth platform allows partners to build a data replication layer between the customer, and DriveWealth. There are many reasons why a partner would want to power a caching layer (including those below)

  • Historical Charts - plotting charts to display how securities have changed overtime using our historical market data endpoint is common practice among DW partners

  • Rendering charts is essential when showing customers how their portfolio or positions have changed over time. When partners’ cache this data, it allows for faster processing and presentation of visual charts/graphs

  • Understanding customer navigation of an application, what data is needed, when, and where. This layer helps to service customers’ faster and more efficiently with reduced (latency) when retrieving data.

  • Partners’ can also take advantage of understanding different actions that are happening in the DriveWealth system.

The partner can implement this caching layer using a mix of:

  • The DriveWealth Event System
  • The DriveWealth API set

Two real-world use cases that streamline Partner integration:
order.completed events
instruments.updated events

The order.completed event is a great way to notify customers about their orders being fulfilled; you can send them a push notification about this event.

The instruments.updated event is a great way to understand if an instrument is currently trading (or halted), instead of the customer receiving an 'Order not Accepted' Error,**
At this point a partner may decide to not show the instrument, or show that the instrument is currently unavailable.

See below for observed (average) timing differences between the two designs:

Caching layer

ActionInfrastructureEndpointResponse Time
User Wants to See Available Technology Sector InstrumentsPartner App to Partner BackendGET - /symbols?sectors=technology-
Partner Query Own Db for InstrumentsPartner Backend To Partner Database-EST(1000ms)
Send Data of Instruments that have the sector Technology Back To UserPartner Backend To Partner App-1000ms

Non - Caching Layer

N = 500+ Technology Sector Instruments*

ActionInfrastructureEndpointResponse Time
User Wants to See Available Technology Sector InstrumentsPartner App to Partner BackendGET - /symbols
Server To Server AuthenticationPartner Backend To DriveWealth BackendPOST - /authAVG(20ms)
Serve Need to Fetch Available Instruments From DriveWealthPartner Backend To DriveWealth BackendGET - /instrumentsAVG(1773ms
Loop through 5,000+ instrumentsPartnerO(N)
Server needs additional information about the instrument to find Technology sector instrumentsPartner Backend To DriveWealth Backend/instruments/{id}AVG(409ms) * N
Send Data of Instruments that have the sector Technology Back To UserPartner Backend To Partner App+200,000ms

Caching design

  • A customer places an order and is then refreshing their positions to see the change
  • In order to retrieve this account summary the partners’ app calls the partners’ backend which returns the account summary
  • The reason they are able to accomplish this is because the partner is receiving events from AMZN SQS. They are able to understand the current positions of a customer (asynchronously, with a Standard Queue Configuration, not FIFO) while updating their own database
641

Non - Caching Design

  • A customer places an order and is then refreshing their positions to see the change
  • To get this information the app calls the partner's backend, which then calls the DriveWealth backend to retrieve the customers’ current positions

Minimal Caching Implementation

Instruments

  • Instruments are the available investment products on the DriveWealth platform that have the ability to be purchased and sold. Implementing a data store of these available instruments allows partners to enable a quick search via their application.

Let’s take a look at 2 examples:

  • The 'list-all-instruments' & 'get-instrument-details' endpoints
  • Historical Charting Data - Historical data is used to understand instruments and their price movements every trading day. Charting is one of the great ways to demonstrate movements.

Example 1 - Here is a simple App & DB to store the data from our 'instruments-list-all', and 'details' endpoints

Postgres

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE INSTRUMENTS (
    drive_wealth_id UUID PRIMARY KEY NOT NULL,
    symbol CHARACTER(20) NOT NULL,
    name VARCHAR(255) NOT NULL,
    status VARCHAR NOT NULL,
    sector varchar(255),
    url VARCHAR(255),
    image VARCHAR(255),
    description TEXT
);

CREATE INDEX INST_IX ON INSTRUMENTS(drive_wealth_id);

Node.js Script

'use strict';

const Massive = require('massive');
const Winston = require('winston');

const DWC = require('./client');
const DWS = require('./service');

const {
    DB_URL,
    DW_USERNAME,
    DW_PASSWORD,
    DW_API_KEY,
    DW_BASE_URL,
    DW_WLP_ID,
    DW_PARENT_IBID
  } = process.env;

const logger = Winston.createLogger({
    level: 'debug',
    format: Winston.format.json(),
    transports: [
        new Winston.transports.Console()
    ]
});

/**
 * Batch Process To Refresh or BackFill Instruments & Instrument Details
 */
 const runBatch = async() => {

    const db = await Massive(DB_URL, { allowedSchemas: ['public'] });
    const dwc = new DWC(DW_BASE_URL, DW_USERNAME, DW_PASSWORD, DW_API_KEY, DW_WLP_ID, DW_PARENT_IBID, logger);
    const dws = new DWS(dwc, db, logger);

    await dws.refreshInstruments();
}

runBatch();
'use strict';

const Boom = require('@hapi/boom');
const Wreck = require('@hapi/wreck');

const IP = require('ip');
const Moment = require('moment');
const OS = require('os');

const Package = require('./package.json');

const GET = 'GET';
const POST = 'POST';
const PATCH = 'PATCH';

/**
 * DriveWealth client use to interface with DW APIs.
 * @class
 * @param {string} baseUrl - DW base URL
 * @param {string} username -  DW API user
 * @param {string} password - DW API password
 * @param {string} apiKey - Assigned API key
 * @param {string} wlpID - Assigned WLPID
 * @param {string} parentIBID - Assigned parentIBID
 * @param {object} logger - logger
 */
class DriveWealthClient {

    constructor(baseUrl, username, password, apiKey, wlpID, parentIBID, logger) {

        this.auth = {
            username,
            password,
            ip_address: IP.address(),
            languageID: 'en_US', 
            osVersion: OS.release(),
            osType: OS.type(),
            appVersion: Package.version,
            appTypeID: 4
        }
        this.logger = logger;
        this.wlpID = wlpID;
        this.parentIBID = parentIBID;

        this.wreck = Wreck.defaults({
            baseUrl,
            headers: { 'dw-client-app-key': apiKey }
        });
    }


    /**
     * Authorize the client to make API requests.
     * @async
     * @function #authorize
     */
    async #authorize() {

        const response = await this.#sendRequest(POST, `/back-office/auth`, { payload: this.auth, json: 'strict', 'content-type': 'application/json' });

        this.logger.info('Successfully Authenticated with DriveWealth');

        this.#setTokenExpiration(response.expiresAt);
        this.#setHeaderValue('dw-auth-token', response.authToken);
    }

    /**
     * Determines if the token is still valid.
     * @async 
     * @function #isSessionValid
     */
    async isSessionValid() {

        if(!this.tokenExpiration || Moment().isAfter(this.tokenExpiration)) {
            await this.#authorize();
        }
    }

    /**
     * Get all instruments from the DriveWealth platform.
     * @async
     * @function getInstruments
     */
    async getInstruments() {

        await this.isSessionValid();

        return this.#sendRequest(GET, '/back-office/instruments', { json: 'strict' });
    }

    /**
     * Get instrument details for a given instrument.
     * @param {string} id 
     * @returns {object} instrument details
     */
    async getInstrumentDetails(id) {
        
        await this.isSessionValid();

        return this.#sendRequest(GET, `back-office/instruments/${id}`, { json: 'strict' });
    }

    /** @this setsTokenExpiration */
    #setTokenExpiration(timestamp) {
        this.tokenExpiration = Moment(timestamp);
    }
    /** @this setsHeaderValue */
    #setHeaderValue(key, value) {
        this.wreck._defaults.headers[key] = value;
    }

    /**
     * Sends the request to DriveWealth
     * @param {string} method - the type of request being made
     * @param {string} uri - the path
     * @param {object} options - options for the request, defeault empty
     * @returns {object} payload
     */
    async #sendRequest(method, uri, options = {}) {

        try {
            const res = await this.wreck.request(method, uri, options);
            return Wreck.read(res, options);
        } 
        catch (error) {

            throw Boom.badImplementation(error);
        }
    }
}

module.exports = DriveWealthClient;
'use strict';

/**
 * DriveWealth service use to interface with DWClient and Database.
 * @class
 * @param {object} client - A client use to interface with DW APIs.
 * @param {object} db - A postgreSQL database interface.
 * @param {object} logger - A logger.
 */
class DriveWealthService {

    constructor(client, db, logger) {

        this._client = client;
        this._db = db;
        this._logger = logger;
    }

    /**
     * Gets all instruments 
     * @returns {object} list of instruments
     */
    getInstruments() {

        return this._client.getInstruments();
    }

    /**
     * Gets Instrument Details 
     * @param {uuid} id - DriveWealth InstrumentId
     * @returns {object} details of instrument
     */
    getInstrumentDetails(id) {

        return this._client.getInstrumentDetails(id);
    }


    /**
     * Insert an instrument on the db.
     * @param {object} data 
     * @returns {object} - inserted record
     */
    addInstrument(data) {

        return this._db.instruments.insert(data);
    }

    /**
     * Update an instrument on the db.
     * @param {object} criteria - update criteria
     * @param {object} data - attributes to be updated 
     * @returns {object} - updated record
     */
    updateInstrument(criteria, data) {

        return this._db.instruments.update(criteria, data);
    }
    
    /**
     * Finds a instrument on the db by Id.
     * @param {uuid} id - DriveWealth instrument id
     * @returns {object} - instrument if found
     */
    findInstrument(id) {
        
        return this._db.instruments.findOne({ drive_wealth_id: id });
    }

    /**
     * Refreshes All Instrument Data
     * @returns {boolean} when the process is successful
     */
    async refreshInstruments() {
    
        const instruments = await this.getInstruments();
        for(let i=0; i<= instruments.length-1; i++) {
            const { image, url, sector, description, status, id, name, symbol } = await this.getInstrumentDetails(instruments[i].id);

            const exists = await this.findInstrument(id);

            if(!exists) {
                await this.addInstrument({
                    drive_wealth_id: id,
                    symbol,
                    name,
                    sector,
                    url,
                    image,
                    status,
                    description
                });
            } 
            else {

                await this.updateInstrument({ drive_wealth_id: id }{
                    drive_wealth_id: id,
                    symbol,
                    name,
                    sector,
                    url,
                    image,
                    status,
                    description 
                });
            }
            //UAT RATE LIMITS
            await new Promise(resolve => setTimeout(resolve, 1000));
        }

        this._logger.info('Success: All Instruments Refreshed');
        return true;
    }
}

module.exports = DriveWealthService;

Caching historical data

PostgreSQL

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE HISTORICAL_MARKET_DATA (
    id uuid NOT NULL DEFAULT uuid_generate_v1(),
    drive_wealth_id uuid NOT NULL,
    date DATE NOT NULL,
    open INTEGER NOT NULL,
    high INTEGER NOT NULL,
    low INTEGER NOT NULL,
    close INTEGER NOT NULL,
    volume INTEGER NOT NULL,
    FOREIGN KEY (drive_wealth_id) REFERENCES INSTRUMENTS(drive_wealth_id),
    PRIMARY KEY (id) 
);

CREATE INDEX HIS_INS_DATA ON HISTORICAL_MARKET_DATA(drive_wealth_id, date);

Node.js Script

'use strict';

const Massive = require('massive');
const Winston = require('winston');

const DWC = require('./client');
const DWS = require('./service');

const {
    DB_URL,
    DW_USERNAME,
    DW_PASSWORD,
    DW_API_KEY,
    DW_BASE_URL,
    DW_WLP_ID,
    DW_PARENT_IBID
  } = process.env;

const logger = Winston.createLogger({
    level: 'debug',
    format: Winston.format.json(),
    transports: [
        new Winston.transports.Console()
    ]
});

/**
 * Batch Process To BackFill Historical Market Data
 */
 const backfill = async() => {

    const date = process.argv[2];
    
    if(!date || Date.parse(date)) {
        logger.error('Please specify a date in which to backfill from');
        process.exit(1);
    }

    const db = await Massive(DB_URL, { allowedSchemas: ['public'] });
    const dwc = new DWC(DW_BASE_URL, DW_USERNAME, DW_PASSWORD, DW_API_KEY, DW_WLP_ID, DW_PARENT_IBID, logger);
    const dws = new DWS(dwc, db, logger);

    await dws.backFillHistoricalMarketData();
}

backfill();
'use strict';

const Boom = require('@hapi/boom');
const Wreck = require('@hapi/wreck');

const IP = require('ip');
const Moment = require('moment');
const OS = require('os');

const Package = require('./package.json');

const GET = 'GET';
const POST = 'POST';
const PATCH = 'PATCH';

/**
 * DriveWealth client use to interface with DW APIs.
 * @class
 * @param {string} baseUrl - DW base URL
 * @param {string} username -  DW API user
 * @param {string} password - DW API password
 * @param {string} apiKey - Assigned API key
 * @param {string} wlpID - Assigned WLPID
 * @param {string} parentIBID - Assigned parentIBID
 * @param {object} logger - logger
 */
class DriveWealthClient {

    constructor(baseUrl, username, password, apiKey, wlpID, parentIBID, logger) {

        this.auth = {
            username,
            password,
            ip_address: IP.address(),
            languageID: 'en_US', 
            osVersion: OS.release(),
            osType: OS.type(),
            appVersion: Package.version,
            appTypeID: 4
        }
        this.logger = logger;
        this.wlpID = wlpID;
        this.parentIBID = parentIBID;

        this.wreck = Wreck.defaults({
            baseUrl,
            headers: { 'dw-client-app-key': apiKey }
        });
    }


    /**
     * Authorize the client to map API requests.
     * @async
     * @function #authorize
     */
    async #authorize() {

        const response = await this.#sendRequest(POST, `/back-office/auth`, { payload: this.auth, json: 'strict', 'content-type': 'application/json' });

        this.logger.info('Successfully Authenticated with DriveWealth');

        this.#setTokenExpiration(response.expiresAt);
        this.#setHeaderValue('dw-auth-token', response.authToken);
    }

    /**
     * Determines if the token is still valid.
     * @async 
     * @function #isSessionValid
     */
    async isSessionValid() {

        if(!this.tokenExpiration || Moment().isAfter(this.tokenExpiration)) {
            await this.#authorize();
        }
    }

    /**
     * Gets historical market data from a specified start/end date. 
     * @async
     * @function getHistoricalData
     */
    async getHistoricalData(dateStart, dateEnd, instrumentID) {

        await this.isSessionValid();

        return this.#sendRequest(GET, `/back-office/bars?instrumentID=${instrumentID}&compression=0&dateStart=${dateStart}&dateEnd=${dateEnd}`, { json: 'strict' });
    }


    /** @this setsTokenExpiration */
    #setTokenExpiration(timestamp) {
        this.tokenExpiration = Moment(timestamp);
    }
    /** @this setsHeaderValue */
    #setHeaderValue(key, value) {
        this.wreck._defaults.headers[key] = value;
    }

    /**
     * Sends the request to DriveWealth
     * @param {string} method - the type of request being made
     * @param {string} uri - the path
     * @param {object} options - options for the request, defeault empty
     * @returns {object} payload
     */
    async #sendRequest(method, uri, options = {}) {

        try {
            const res = await this.wreck.request(method, uri, options);
            return Wreck.read(res, options);
        } 
        catch (error) {

            throw Boom.badImplementation(error);
        }
    }
}

module.exports = DriveWealthClient;
'use strict';

const Moment = require('moment');

/**
 * DriveWealth service use to interface with DWClient and Database.
 * @class
 * @param {object} client - A client use to interface with DW APIs.
 * @param {object} db - A postgreSQL database interface.
 * @param {object} logger - A logger.
 */
class DriveWealthService {

    constructor(client, db, logger) {

        this._client = client;
        this._db = db;
        this._logger = logger;
    }

    /**
     * Get Historical Market Data 
     * @param {string} dateStart - ISODate get gata from
     * @param {string} dateEnd - ISODate get data to
     * @param {uuid} instrumentID - DriveWealth id
     * @returns {object} - historical data
     */
    getHistoricalData(dateStart, dateEnd, instrumentID) {

        return this._client.getHistoricalData(dateStart, dateEnd, instrumentID);
    }
    
    /**
     * Finds all instruments on the db.
     * @returns {object} - list of instruments
     */
    findInstruments() {
        
        return this._db.instruments.find();
    }

    /**
     * Find Historical Market Data on the db.
     * @param {object} - conditions on what historical data to find
     * @returns {object} - market data if found
     */
    findHistoricalData(args) {

        return this._db.historical_market_data.findOne(args)
    }

    /**
     * BackFills historical market data from date specified.
     * @returns {boolean} when the process is successful
     */
    async backFillHistoricalMarketData(fromDate) {
    
        const instruments = await this.findInstruments();
        const dateStart = Moment(fromDate).startOf('day').format('YYYY-MM-DDTHH:mm:ss[Z]').toString();
        const dateEnd = Moment().subtract(1, 'day').endOf('day').format('YYYY-MM-DDTHH:mm:ss[Z]').toString();
        for(let i=0; i<= instruments.length-1; i++) {

            const hisData = await this.getHistoricalData(dateStart, dateEnd, instruments[i].drive_wealth_id);
            
            //UAT Custom Instrument
            if(hisData.errorCode === 'Custom') {
                continue;
            }

            if(!hisData.data || hisData.data.trim() === '') {
                continue;
            }

            await this.handleCompressionData(hisData.data, instruments[i].drive_wealth_id);

            //UAT API RATE LIMIT
            await new Promise(resolve => setTimeout(resolve, 1000));
        }

        this._logger.info('Success: Market Data Backfilled');
        return true;
    }

    async handleCompressionData(data, instrumentId){

        const datesData = data.split('|');

        for(let j=0; j<=datesData.length-1; j++) {
            
            const dateData = datesData[j].split(',');
            const sData = {
                date: new Date(dateData[0]), 
                open: parseInt(dateData[1]),
                high: parseInt(dateData[2]),
                low: parseInt(dateData[3]),
                close: parseInt(dateData[4]),
                volume: parseInt(dateData[5])
            }

            if(await this.findHistoricalData({ date: sData.date, drive_wealth_id: instrumentId })) {
                continue;
            }
            await this.addhistoricData(sData, instrumentId);
        }
    }

    addhistoricData(data, instrumentId) {

        return this._db.historical_market_data.insert({
            drive_wealth_id: instrumentId,
            ...data
        });
    }
}

module.exports = DriveWealthService;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE HISTORICAL_MARKET_DATA (
    id uuid NOT NULL DEFAULT uuid_generate_v1(),
    drive_wealth_id uuid NOT NULL,
    date DATE NOT NULL,
    open INTEGER NOT NULL,
    high INTEGER NOT NULL,
    low INTEGER NOT NULL,
    close INTEGER NOT NULL,
    volume INTEGER NOT NULL,
    FOREIGN KEY (drive_wealth_id) REFERENCES INSTRUMENTS(drive_wealth_id),
    PRIMARY KEY (id) 
);

CREATE INDEX HIS_INS_DATA ON HISTORICAL_MARKET_DATA(drive_wealth_id, date);

Node.js Script

'use strict';

const Massive = require('massive');
const Winston = require('winston');

const DWC = require('./client');
const DWS = require('./service');

const {
    DB_URL,
    DW_USERNAME,
    DW_PASSWORD,
    DW_API_KEY,
    DW_BASE_URL,
    DW_WLP_ID,
    DW_PARENT_IBID
  } = process.env;

const logger = Winston.createLogger({
    level: 'debug',
    format: Winston.format.json(),
    transports: [
        new Winston.transports.Console()
    ]
});

/**
 * Batch Process To BackFill Historical Market Data
 */
 const backfill = async() => {

    const date = process.argv[2];
    
    if(!date || Date.parse(date)) {
        logger.error('Please specify a date in which to backfill from');
        process.exit(1);
    }

    const db = await Massive(DB_URL, { allowedSchemas: ['public'] });
    const dwc = new DWC(DW_BASE_URL, DW_USERNAME, DW_PASSWORD, DW_API_KEY, DW_WLP_ID, DW_PARENT_IBID, logger);
    const dws = new DWS(dwc, db, logger);

    await dws.backFillHistoricalMarketData();
}

backfill();
'use strict';

const Boom = require('@hapi/boom');
const Wreck = require('@hapi/wreck');

const IP = require('ip');
const Moment = require('moment');
const OS = require('os');

const Package = require('./package.json');

const GET = 'GET';
const POST = 'POST';
const PATCH = 'PATCH';

/**
 * DriveWealth client use to interface with DW APIs.
 * @class
 * @param {string} baseUrl - DW base URL
 * @param {string} username -  DW API user
 * @param {string} password - DW API password
 * @param {string} apiKey - Assigned API key
 * @param {string} wlpID - Assigned WLPID
 * @param {string} parentIBID - Assigned parentIBID
 * @param {object} logger - logger
 */
class DriveWealthClient {

    constructor(baseUrl, username, password, apiKey, wlpID, parentIBID, logger) {

        this.auth = {
            username,
            password,
            ip_address: IP.address(),
            languageID: 'en_US', 
            osVersion: OS.release(),
            osType: OS.type(),
            appVersion: Package.version,
            appTypeID: 4
        }
        this.logger = logger;
        this.wlpID = wlpID;
        this.parentIBID = parentIBID;

        this.wreck = Wreck.defaults({
            baseUrl,
            headers: { 'dw-client-app-key': apiKey }
        });
    }


    /**
     * Authorize the client to map API requests.
     * @async
     * @function #authorize
     */
    async #authorize() {

        const response = await this.#sendRequest(POST, `/back-office/auth`, { payload: this.auth, json: 'strict', 'content-type': 'application/json' });

        this.logger.info('Successfully Authenticated with DriveWealth');

        this.#setTokenExpiration(response.expiresAt);
        this.#setHeaderValue('dw-auth-token', response.authToken);
    }

    /**
     * Determines if the token is still valid.
     * @async 
     * @function #isSessionValid
     */
    async isSessionValid() {

        if(!this.tokenExpiration || Moment().isAfter(this.tokenExpiration)) {
            await this.#authorize();
        }
    }

    /**
     * Gets historical market data from a specified start/end date. 
     * @async
     * @function getHistoricalData
     */
    async getHistoricalData(dateStart, dateEnd, instrumentID) {

        await this.isSessionValid();

        return this.#sendRequest(GET, `/back-office/bars?instrumentID=${instrumentID}&compression=0&dateStart=${dateStart}&dateEnd=${dateEnd}`, { json: 'strict' });
    }


    /** @this setsTokenExpiration */
    #setTokenExpiration(timestamp) {
        this.tokenExpiration = Moment(timestamp);
    }
    /** @this setsHeaderValue */
    #setHeaderValue(key, value) {
        this.wreck._defaults.headers[key] = value;
    }

    /**
     * Sends the request to DriveWealth
     * @param {string} method - the type of request being made
     * @param {string} uri - the path
     * @param {object} options - options for the request, defeault empty
     * @returns {object} payload
     */
    async #sendRequest(method, uri, options = {}) {

        try {
            const res = await this.wreck.request(method, uri, options);
            return Wreck.read(res, options);
        } 
        catch (error) {

            throw Boom.badImplementation(error);
        }
    }
}

module.exports = DriveWealthClient;
'use strict';

const Moment = require('moment');

/**
 * DriveWealth service use to interface with DWClient and Database.
 * @class
 * @param {object} client - A client use to interface with DW APIs.
 * @param {object} db - A postgreSQL database interface.
 * @param {object} logger - A logger.
 */
class DriveWealthService {

    constructor(client, db, logger) {

        this._client = client;
        this._db = db;
        this._logger = logger;
    }

    /**
     * Get Historical Market Data 
     * @param {string} dateStart - ISODate get gata from
     * @param {string} dateEnd - ISODate get data to
     * @param {uuid} instrumentID - DriveWealth id
     * @returns {object} - historical data
     */
    getHistoricalData(dateStart, dateEnd, instrumentID) {

        return this._client.getHistoricalData(dateStart, dateEnd, instrumentID);
    }
    
    /**
     * Finds all instruments on the db.
     * @returns {object} - list of instruments
     */
    findInstruments() {
        
        return this._db.instruments.find();
    }

    /**
     * Find Historical Market Data on the db.
     * @param {object} - conditions on what historical data to find
     * @returns {object} - market data if found
     */
    findHistoricalData(args) {

        return this._db.historical_market_data.findOne(args)
    }

    /**
     * BackFills historical market data from date specified.
     * @returns {boolean} when the process is successful
     */
    async backFillHistoricalMarketData(fromDate) {
    
        const instruments = await this.findInstruments();
        const dateStart = Moment(fromDate).startOf('day').format('YYYY-MM-DDTHH:mm:ss[Z]').toString();
        const dateEnd = Moment().subtract(1, 'day').endOf('day').format('YYYY-MM-DDTHH:mm:ss[Z]').toString();
        for(let i=0; i<= instruments.length-1; i++) {

            const hisData = await this.getHistoricalData(dateStart, dateEnd, instruments[i].drive_wealth_id);
            
            //UAT Custom Instrument
            if(hisData.errorCode === 'Custom') {
                continue;
            }

            if(!hisData.data || hisData.data.trim() === '') {
                continue;
            }

            await this.handleCompressionData(hisData.data, instruments[i].drive_wealth_id);

            //UAT API RATE LIMIT
            await new Promise(resolve => setTimeout(resolve, 1000));
        }

        this._logger.info('Success: Market Data Backfilled');
        return true;
    }

    async handleCompressionData(data, instrumentId){

        const datesData = data.split('|');

        for(let j=0; j<=datesData.length-1; j++) {
            
            const dateData = datesData[j].split(',');
            const sData = {
                date: new Date(dateData[0]), 
                open: parseInt(dateData[1]),
                high: parseInt(dateData[2]),
                low: parseInt(dateData[3]),
                close: parseInt(dateData[4]),
                volume: parseInt(dateData[5])
            }

            if(await this.findHistoricalData({ date: sData.date, drive_wealth_id: instrumentId })) {
                continue;
            }
            await this.addhistoricData(sData, instrumentId);
        }
    }

    addhistoricData(data, instrumentId) {

        return this._db.historical_market_data.insert({
            drive_wealth_id: instrumentId,
            ...data
        });
    }
}

module.exports = DriveWealthService;