Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ Runtime Error when creating the second worker thread that importing the msnodesqlv8 module #228

Closed
qbaonguyen98 opened this issue Jan 24, 2022 · 6 comments

Comments

@qbaonguyen98
Copy link

qbaonguyen98 commented Jan 24, 2022

I am trying to create multiple worker threads utilizing the msnodesqlv8 module, but it shown the C++ runtime error:
image

  • msnodesqlv8 version: 2.4.4

  • Project files structure:
    image

  • index.js code:
    image

  • dbWorker.js code:
    image

Any solution? Thanks.

@TimelordUK
Copy link
Owner

TimelordUK commented Jan 24, 2022

The c++ already runs on a threadpool, I am not sure what is gained from above. You can create a connection pool as below which will dispatch work across an allocated set of connections. ODBC permits only 1 command active per connection so each connection is itself a queue.

You would have to require the library top level in one module and create your worker passing in the sql instance.

But i do not see why you would not use a pool to balance work load.

import {
    PoolOptions,  
    SqlClient,  
    Pool,  
    Connection, 
    QueryAggregatorResults, 
    ConnectionPromises, 
    BulkTableMgr
} from 'msnodesqlv8';



export const sql : SqlClient = require('msnodesqlv8');


async function pool() {
    try {
        const connStr: string = getConnection()
        const size = 4
        const options: PoolOptions = {
            connectionString: connStr,
            ceiling: size
        }
        const pool: Pool = new sql.Pool(options)
        await pool.promises.open()
        const all = Array(size * 2).fill(0).map((_, i) => pool.promises.query(`select ${i} as i, @@SPID as spid`))
        const promised: QueryAggregatorResults[] = await Promise.all(all)
        const res = promised.map(r => r.first[0].spid)
        await pool.promises.close()
        console.log(`pool spids ${res.join(', ')}`)
    } catch (e) {
        console.log(e)
    }
}

@qbaonguyen98
Copy link
Author

qbaonguyen98 commented Jan 25, 2022

I need the worker threads to make multiple heavy calculation, each of which needs a mssql connection. So the worker threads are not aimed to replace the pools. Look like the problem is just "whenever I import the 'msnodesqlv8' module inside worker script, creating 2 workers will run into the C++ runtime error".

@TimelordUK
Copy link
Owner

unfortunately this is not currently supported - the C++ itself has some static data which cannot be shared across threads. I am not sure right now how technically easy it would be to solve this.

@TimelordUK
Copy link
Owner

not ideal but this may be OK providing the sql queries do not return too many rows

this was thrown together but idea is main creates a single connection pool then services sql from workers and returns back results worker then runs expensive query which could be based on results.

main.js

const path = require('path')
const filePath = path.resolve(__dirname, './worker-item.js')
const sql = require('msnodesqlv8')
const connString = 'Driver={ODBC Driver 17 for SQL Server};Server=(localdb)\\node;Database=scratch;Trusted_Connection=yes;'
const {Worker} = require("worker_threads")

//Create new worker
const worker = new Worker(filePath);

const pool = new sql.Pool({
  connectionString: connString
})

pool.on('open', (options) => {
  console.log(`ready options = ${JSON.stringify(options, null, 4)}`)
})

pool.on('debug', msg => {
  // console.log(`\t\t\t\t\t\t${new Date().toLocaleTimeString()} <pool.debug> ${msg}`)
})

pool.on('status', s => {
  // console.log(`status = ${JSON.stringify(s, null, 4)}`)
})

pool.on('error', e => {
  console.log(e)
})

pool.open()

//Listen for a message from worker
worker.on("message", msg => {
  switch (msg.command) {
    case 'sql_query': {
      console.log(`main: request to exec sql ${msg.sql}`)
      pool.query(msg.sql, (e, res) => {
        worker.postMessage(
          {
            data: msg.data,
            query_id: msg.query_id,
            command: 'sql_result',
            rows: res,
            error: e
          })
      })
      break
    }

    case 'task_result': {
      console.log(JSON.stringify(msg, null, 4))
    }
  }
})

worker.on("error", error => {
  console.log(error);
})

worker.postMessage(
  {
    command: 'task',
    num: 40
  })

worker-item.js

const { parentPort } = require("worker_threads")
let queryId = 0

parentPort.on("message", msg => {
  switch (msg.command) {

    case 'task': {
      let query = ++queryId
      console.log(`worker receive task ${msg.num}`)
      // let main master thread run query on pool and post back results
      parentPort.postMessage(
        {
          query_id: query,
          sql: `select ${msg.num} as i, @@SPID as spid`,
          command: 'sql_query',
          data: msg.num,
        })
    }
    break
     // now we have sql results run expensive calc
    case 'sql_result': {
      parentPort.postMessage(
        {
          command: 'task_result',
          data: `spid ${msg.rows[0].spid}`,
          num: msg.data,
          fib: getFib(msg.data)
        })
    }
  }
})

function getFib(num) {
  if (num === 0) {
    return 0;
  }
  else if (num === 1) {
    return 1;
  }
  else {
    return getFib(num - 1) + getFib(num - 2);
  }
}

@qbaonguyen98
Copy link
Author

qbaonguyen98 commented Jan 27, 2022

Thanks @TimelordUK. Look like the messaging between main and workers is the only solution right now.

@TimelordUK
Copy link
Owner

I will look at the c++ to see if I can find the reason for this failure - are you able to build the c++?

all I have done so far is this change these 2 functions in OdbcHandle.cpp

it does seem to be running OK but would require a lot more testing

	bool OdbcHandle::alloc()
	{
		if (handle) return true; 
		assert(handle == SQL_NULL_HANDLE);
		const auto ret = SQLAllocHandle(HandleType, nullptr, &handle);
		if (!SQL_SUCCEEDED(ret))
		{
			handle = nullptr;
			return false;
		}
		return true;
	}
	
	bool OdbcHandle::alloc(const OdbcHandle &parent)
	{
		if (handle) return true;
		assert(handle == SQL_NULL_HANDLE);
		const auto ret = SQLAllocHandle(HandleType, parent, &handle);
		//fprintf(stderr, "Alloc OdbcHandle %i %p\n", HandleType, handle);
		if (!SQL_SUCCEEDED(ret))
		{
			handle = nullptr;
			return false;
		}
		return true;
	}

i.e. the worker becomes

worker-item

const { parentPort } = require('worker_threads')
const sql = require('msnodesqlv8')

function getConnection () {
  const path = require('path')
  const config = require(path.join(__dirname, 'config.json'))
  return config.connection.local
}

const connectionString = getConnection()

parentPort.on('message', async msg => {
  switch (msg.command) {
    case 'task': {
      console.log(`worker receive task ${msg.num}`)
      const conn = await sql.promises.open(connectionString)
      const query = `select ${msg.num} as i, @@SPID as spid`
      const res = await conn.promises.query(query)
      await conn.promises.close()
      parentPort.postMessage(
        {
          command: 'task_result',
          data: `spid ${res.first[0].spid}`,
          num: msg.num,
          fib: getFib(msg.num)
        })
      // let main master thread run query on pool and post back results
      /*
      parentPort.postMessage(
        {
          query_id: query,
          sql: `select ${msg.num} as i, @@SPID as spid`,
          command: 'sql_query',
          data: msg.num
        })
        */
    }
      break
    // now we have sql results run expensive calc
    case 'sql_result': {
      parentPort.postMessage(
        {
          command: 'task_result',
          data: `spid ${msg.rows[0].spid}`,
          num: msg.data,
          fib: getFib(msg.data)
        })
    }
  }
})

function getFib (num) {
  if (num === 0) {
    return 0
  } else if (num === 1) {
    return 1
  } else {
    return getFib(num - 1) + getFib(num - 2)
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants