Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network graph #30

Open
Efkovole opened this issue Oct 8, 2024 · 2 comments
Open

Network graph #30

Efkovole opened this issue Oct 8, 2024 · 2 comments

Comments

@Efkovole
Copy link

Efkovole commented Oct 8, 2024

I'd like to thank you in advance for your work.
However, I have a problem with the network graph. It gets stuck after a few minutes.

@Efkovole
Copy link
Author

Efkovole commented Oct 8, 2024

I fixed the code. network.js and network_thread.js

↓↓network.js ↓↓

'use strict';
/*!
 * s1panel - sensor/network
 * Copyright (c) 2024 Tomasz Jaworski (Fixed Filip Šmída)
 * GPL-3 Licensed
 */
const threads = require('worker_threads');
const logger = require('../logger');

function record_sample(array, value, max_points) {
    if (!array.length) {
        for (let i = 0; i < max_points; i++) {
            array.push(0);
        }
    }
    array.push(value);
    array.shift();

    return value;
}

function bytes_to_data_rate(bytes, bits) {
    const _value = bits ? bytes * 8 : bytes;
    const kb = _value / 1024;
    const mb = kb / 1024;
    const gb = mb / 1024;

    if (gb >= 1) {
        return gb.toFixed(bits ? 0 : 2) + (bits ? ' Gbit/s' : ' Gb/s');
    } else if (mb >= 1) {
        return mb.toFixed(bits ? 0 : 2) + (bits ? ' Mbit/s' : ' Mb/s');
    } else if (kb >= 1) {
        return kb.toFixed(bits ? 0 : 2) + (bits ? ' kbit/s' : ' Kb/s');
    }
    return bytes + (bits ? ' bit/s' : ' B/s');
}

function max_link_capacity_bytes(link_speed) {
    return (link_speed * 0.125) * (1024 * 1024);
}

async function sample(rate, format, config) {
    const _private = config._private;
    const _diff = _private.last_sampled ? Math.floor(Number(process.hrtime.bigint()) / 1000000) - _private.last_sampled : 0;
    let _dirty = false;

    if (!_private.last_sampled || _diff > rate) {
        _private.last_sampled = Math.floor(Number(process.hrtime.bigint()) / 1000000);
        _private.worker.postMessage({ iface: _private.iface, rate: rate });
        _dirty = _private.thread_checkin_count ? true : false;
        _private.thread_checkin_count = 0;
    }

    if (_dirty) {
        _private.max_rx_bytes = Math.max(..._private.history_rx_bytes);
        _private.max_tx_bytes = Math.max(..._private.history_tx_bytes);
        _private.max_rx_packets = Math.max(..._private.history_rx_packets);
        _private.max_tx_packets = Math.max(..._private.history_tx_packets);
    }

    const _absolute_max = max_link_capacity_bytes(_private.link_speed);
    let _max = _absolute_max;

    const _output = format.replace(/{(\d+)}/g, (match, number) => {
        switch (number) {
            case '0':
                return _private.iface;
            case '1': // download
                _max = _private.scale_factor ? Math.min(Math.ceil(_private.max_rx_bytes * _private.scale_factor), _absolute_max) : _absolute_max;
                return _private.history_rx_bytes.length > 0 ? _private.history_rx_bytes[_private.history_rx_bytes.length - 1] : 0;
            case '2':
                return _private.history_rx_bytes.join();
            case '3':
                return _private.history_rx_bytes.length > 0 ? bytes_to_data_rate(_private.history_rx_bytes[_private.history_rx_bytes.length - 1]) : '0 B/s';
            case '4': // upload
                _max = _private.scale_factor ? Math.min(Math.ceil(_private.max_tx_bytes * _private.scale_factor), _absolute_max) : _absolute_max;
                return _private.history_tx_bytes.length > 0 ? _private.history_tx_bytes[_private.history_tx_bytes.length - 1] : 0;
            case '5':
                return _private.history_tx_bytes.join();
            case '6':
                return _private.history_tx_bytes.length > 0 ? bytes_to_data_rate(_private.history_tx_bytes[_private.history_tx_bytes.length - 1]) : '0 B/s';
            case '7': // download packets
                _max = _private.max_rx_packets;
                return _private.history_rx_packets.length > 0 ? _private.history_rx_packets[_private.history_rx_packets.length - 1] : 0;
            case '8':
                return _private.history_rx_packets.length > 0 ? _private.history_rx_packets[_private.history_rx_packets.length - 1] + ' rx/pps' : '0 rx/pps';
            case '9':
                return _private.history_rx_packets.join();
            case '10': // upload packets
                _max = _private.max_tx_packets;
                return _private.history_tx_packets.length > 0 ? _private.history_tx_packets[_private.history_tx_packets.length - 1] : 0;
            case '11':
                return _private.history_tx_packets.length > 0 ? _private.history_tx_packets[_private.history_tx_packets.length - 1] + ' tx/pps' : '0 tx/pps';
            case '12':
                return _private.history_tx_packets.join();
            case '13':
                return _private.link_speed;
            case '14':
                return _private.link_mtu;
            case '15':  // bits
                return _private.history_rx_bytes.length > 0 ? bytes_to_data_rate(_private.history_rx_bytes[_private.history_rx_bytes.length - 1], true) : '0 bit/s';
            case '16':
                return _private.history_tx_bytes.length > 0 ? bytes_to_data_rate(_private.history_tx_bytes[_private.history_tx_bytes.length - 1], true) : '0 bit/s';
            case '17':
                return _private.ipv4;
            case '18':
                return _private.ipv6;
            default:
                return 'null';
        }
    });

    return { value: _output, min: 0, max: _max };
}

function init(config) {
    const _private = {
        max_points: config?.max_points || 300,
        iface: config?.interface || 'enp2s0',
        history_rx_bytes: [],
        history_tx_bytes: [],
        max_rx_bytes: 0,
        max_tx_bytes: 0,
        history_rx_packets: [],
        history_tx_packets: [],
        max_rx_packets: 0,
        max_tx_packets: 0,
        link_speed: 1000,
        link_mtu: 1500,
        scale_factor: config?.scaling || 1.5,
        thread_checkin_count: 0
    };

    logger.info('initialize: monitoring interface ' + _private.iface);
    logger.info('initialize: network max points are set to ' + _private.max_points);

    _private.worker = new threads.Worker(__dirname + '/network_thread.js', { workerData: { iface: _private.iface } });

    _private.worker.on('message', message => {
        if (!message.rx || !message.tx) {
            logger.error('Chybí data ve zprávě workeru:', message);
            return;
        }

        _private.link_mtu = message.mtu;
        _private.link_speed = message.speed;

        record_sample(_private.history_rx_bytes, message.rx.bytes, _private.max_points);
        record_sample(_private.history_tx_bytes, message.tx.bytes, _private.max_points);

        record_sample(_private.history_rx_packets, message.rx.packets, _private.max_points);
        record_sample(_private.history_tx_packets, message.tx.packets, _private.max_points);

        _private.ipv4 = message.ipv4;
        _private.ipv6 = message.ipv6;

        _private.thread_checkin_count++;
    });

    _private.worker.on('error', (err) => {
        logger.error('Worker error:', err);
        _private.worker.terminate(); // Ukončit worker při chybě
        _private.worker = new threads.Worker(__dirname + '/network_thread.js', { workerData: { iface: _private.iface } });
    });

    _private.worker.on('exit', (code) => {
        if (code !== 0) {
            logger.error(`Worker stopped with exit code ${code}`);
        }
    });

    config._private = _private;

    return 'network_' + _private.iface;
}

module.exports = {
    init,
    sample
};

↓↓ network_thread.js ↓↓

'use strict';
/*!
 * s1panel - sensor/network_thread
 * Copyright (c) 2024 Tomasz Jaworski
 * GPL-3 Licensed
 */
const fs = require('fs');
const threads = require('worker_threads');
const logger = require('../logger');
const { exec } = require('child_process');

const DEFAULT_RATE_MS = 1000;
const TIMEOUT_COUNT = 30;

let _running = false;
let _collect_count = 0;

let _fault = false;

function read_file(path) {
    return new Promise((fulfill, reject) => {
        fs.readFile(path, 'utf8', (err, data) => {
            if (err) {
                logger.error(`network_thread: Error reading file ${path}: ${err.message}`);
                return reject(new Error(`Error reading file ${path}: ${err.message}`));
            }
            fulfill(data);
        });
    });
}

function run_command(cmdline) {
    return new Promise((fulfill, reject) => {
        const _runit = exec(cmdline, (error, stdout, stderr) => {
            if (error) {
                logger.error(`network_thread: Command failed (${cmdline}): ${error.message}`);
                return reject(new Error(`Command failed (${cmdline}): ${error.message}`));
            }
            fulfill(stdout);
        });
    });
}

function read_ip(iface) {
    return new Promise((fulfill, reject) => {
        const _cmdline = `ip -j a show dev ${iface}`;

        run_command(_cmdline)
            .then(output => {
                try {
                    const parsed = JSON.parse(output);
                    fulfill(parsed);
                } catch (parseError) {
                    logger.error(`network_thread: JSON parse error for IP data: ${parseError.message}`);
                    reject(new Error(`JSON parse error for IP data: ${parseError.message}`));
                }
            })
            .catch(err => {
                if (!_fault) {
                    logger.error(`network_thread: sensors reported error: ${err.message}`);
                    _fault = true;
                }
                fulfill(null); // Pokračovat i při chybě
            });
    });
}

function network_usage(iface) {
    const _base_path = `/sys/class/net/${iface}`;
    const _path = `${_base_path}/statistics`;

    return Promise.all([
        read_file(`${_base_path}/mtu`).catch(err => null),
        read_file(`${_base_path}/speed`).catch(err => null),
        read_file(`${_path}/rx_bytes`).catch(err => null),
        read_file(`${_path}/tx_bytes`).catch(err => null),
        read_file(`${_path}/rx_packets`).catch(err => null),
        read_file(`${_path}/tx_packets`).catch(err => null),
        read_ip(iface).catch(err => null)
    ]);
}

let _last_rx_bytes = 0;
let _last_tx_bytes = 0;
let _last_rx_packets = 0;
let _last_tx_packets = 0;

function collect(message) {
    _collect_count++;

    if (_collect_count < TIMEOUT_COUNT) {
        network_usage(message.iface)
            .then(results => {
                if (!results || results.length < 7) {
                    logger.error('network_thread: Incomplete network usage data received.');
                    return;
                }

                const _mtu = Number(results[0]);
                const _speed = Number(results[1]);

                const _current_rx_bytes = Number(results[2]);
                const _current_tx_bytes = Number(results[3]);

                const _current_rx_packets = Number(results[4]);
                const _current_tx_packets = Number(results[5]);

                const _delta_rx_bytes = _last_rx_bytes ? _current_rx_bytes - _last_rx_bytes : 0;
                const _delta_tx_bytes = _last_tx_bytes ? _current_tx_bytes - _last_tx_bytes : 0;

                const _delta_rx_packets = _last_rx_packets ? _current_rx_packets - _last_rx_packets : 0;
                const _delta_tx_packets = _last_tx_packets ? _current_tx_packets - _last_tx_packets : 0;

                _last_rx_bytes = _current_rx_bytes;
                _last_tx_bytes = _current_tx_bytes;

                _last_rx_packets = _current_rx_packets;
                _last_tx_packets = _current_tx_packets;

                const _ip_data = results[6];
                let _ipv4 = 'n/a';
                let _ipv4_count = 0;
                let _ipv6 = 'n/a';
                let _ipv6_count = 0;

                if (_ip_data) {
                    _ip_data.forEach(each => {
                        each.addr_info.forEach(info => {
                            if ('global' === info.scope) {
                                switch (info.family) {
                                    case 'inet':
                                        if (!_ipv4_count) {
                                            _ipv4 = info.local;
                                            _ipv4_count++;
                                        }
                                        break;
                                    case 'inet6':
                                        if (!_ipv6_count) {
                                            _ipv6 = info.local;
                                            _ipv6_count++;
                                        }
                                        break;
                                }
                            }
                        });
                    });
                }

                // Ověření, že všechna data jsou validní
                if (isNaN(_mtu) || isNaN(_speed)) {
                    logger.error('network_thread: Invalid MTU or speed data.');
                }

                if (isNaN(_delta_rx_bytes) || isNaN(_delta_tx_bytes) || isNaN(_delta_rx_packets) || isNaN(_delta_tx_packets)) {
                    logger.error('network_thread: Invalid delta values.');
                }

                threads.parentPort.postMessage({
                    mtu: _mtu || 0,
                    speed: _speed || 0,
                    rx: { bytes: _delta_rx_bytes || 0, packets: _delta_rx_packets || 0 },
                    tx: { bytes: _delta_tx_bytes || 0, packets: _delta_tx_packets || 0 },
                    ipv4: _ipv4,
                    ipv6: _ipv6
                });

                setTimeout(() => {
                    collect(message);
                }, message.rate || DEFAULT_RATE_MS);
            })
            .catch(err => {
                logger.error(`network_thread: Error during data collection: ${err.message}`);
                // Pokračovat i při chybě, aby se skript neskončil
                setTimeout(() => {
                    collect(message);
                }, message.rate || DEFAULT_RATE_MS);
            });
    } else {
        logger.info('network_thread: collector stopped for iface ' + message.iface);
        _running = false;
    }
}

threads.parentPort.on('message', message => {
    _collect_count = 0; // reset

    if (!_running) {
        _running = true;

        logger.info('network_thread: collector started for iface ' + message.iface);

        collect(message);
    }
});

logger.info('network_thread: started... for ' + threads.workerData.iface);

@tjaworski
Copy link
Owner

do you know why you were getting invalid values?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants