Skip to content

Commit

Permalink
work in progress for #168
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Apr 6, 2023
1 parent 9fe07cd commit be05fab
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 262 deletions.
9 changes: 4 additions & 5 deletions filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def file_processor(pathname, **kwargs):
uid = kwargs["uid"]
gid = kwargs["gid"]
destination = kwargs["destination"]
logger = kwargs["logger"]
logger = kwargs["logger"] if "logger" in kwargs and kwargs["logger"] else logging

logger.info(f"{scriptName}:\t👓\t{pathname}")

Expand All @@ -72,12 +72,12 @@ def file_processor(pathname, **kwargs):

if fileMime in mime_types:
# looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat
logger.info(f"{scriptName}:\t🖅\t{pathname} ({fileMime}) to {destination}")
logger.info(f"{scriptName}:\t🖅\t{pathname} [{fileMime}] to {destination}")
shutil.move(pathname, os.path.join(destination, os.path.basename(pathname)))

else:
# unhandled file type uploaded, delete it
logger.warning(f"{scriptName}:\t🗑\t{pathname} ({fileMime})")
logger.warning(f"{scriptName}:\t🗑\t{pathname} [{fileMime}]")
os.unlink(pathname)

except Exception as genericError:
Expand Down Expand Up @@ -210,8 +210,7 @@ def main():

# if directory to monitor doesn't exist, create it now
if not os.path.isdir(args.srcDir):
if debug:
eprint(f'{scriptName}:\tcreating "{args.srcDir}" to monitor')
logging.info(f'{scriptName}:\tcreating "{args.srcDir}" to monitor')
pathlib.Path(args.srcDir).mkdir(parents=False, exist_ok=True)

# if recursion was requested, get list of directories to monitor
Expand Down
6 changes: 3 additions & 3 deletions pcap-monitor/scripts/watch-pcap-uploads-folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def file_processor(pathname, **kwargs):
gid = kwargs["gid"]
pcapDir = kwargs["destination"]
zeekDir = kwargs["zeek"]
logger = kwargs["logger"]
logger = kwargs["logger"] if "logger" in kwargs and kwargs["logger"] else logging

logger.info(f"{scriptName}:\t👓\t{pathname}")

Expand All @@ -61,7 +61,7 @@ def file_processor(pathname, **kwargs):
(fileMime in ('application/vnd.tcpdump.pcap', 'application/x-pcapng')) or ('pcap-ng' in fileType)
):
# a pcap file to be processed by dropping it into pcapDir
logger.info(f"{scriptName}:\t🖅\t{pathname} ({fileMime}/{fileType}) to {pcapDir}")
logger.info(f"{scriptName}:\t🖅\t{pathname} [{fileMime}][{fileType}] to {pcapDir}")
shutil.move(pathname, os.path.join(pcapDir, os.path.basename(pathname)))

elif os.path.isdir(zeekDir) and (
Expand All @@ -81,7 +81,7 @@ def file_processor(pathname, **kwargs):
]
):
# looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat
logger.info(f"{scriptName}:\t🖅\t{pathname} ({fileMime}/{fileType}) to {zeekDir}")
logger.info(f"{scriptName}:\t🖅\t{pathname} [{fileMime}][{fileType}] to {zeekDir}")
shutil.move(pathname, os.path.join(zeekDir, os.path.basename(pathname)))

else:
Expand Down
137 changes: 38 additions & 99 deletions shared/bin/pcap_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
ARKIME_FILE_SIZE_FIELD = "filesize"

###################################################################################################
debug = False
verboseDebug = False
pdbFlagged = False
args = None
opensearchHttpAuth = None
Expand All @@ -73,15 +71,14 @@
###################################################################################################
# watch files written to and moved to this directory
class EventWatcher:
def __init__(self):
def __init__(self, logger=None):
global args
global opensearchHttpAuth
global debug
global verboseDebug
global shuttingDown

super().__init__()

self.logger = logger if logger else logging
self.useOpenSearch = False
self.openSearchClient = None

Expand All @@ -94,8 +91,7 @@ def __init__(self):
while (not connected) and (not shuttingDown[0]):
try:
try:
if debug:
eprint(f"{scriptName}:\tconnecting to OpenSearch {args.opensearchUrl}...")
self.logger.info(f"{scriptName}:\tconnecting to OpenSearch {args.opensearchUrl}...")

self.openSearchClient = OpenSearch(
hosts=[args.opensearchUrl],
Expand All @@ -106,16 +102,14 @@ def __init__(self):
request_timeout=1,
)

if verboseDebug:
eprint(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")
self.logger.debug(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")

self.openSearchClient.cluster.health(
wait_for_status='red',
request_timeout=1,
)

if verboseDebug:
eprint(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")
self.logger.debug(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")

connected = self.openSearchClient is not None
if not connected:
Expand All @@ -127,12 +121,12 @@ def __init__(self):
ConnectionRefusedError,
NewConnectionError,
) as connError:
if debug:
eprint(f"{scriptName}:\tOpenSearch connection error: {connError}")
self.logger.error(f"{scriptName}:\tOpenSearch connection error: {connError}")

except Exception as genericError:
if debug:
eprint(f"{scriptName}:\tUnexpected exception while connecting to OpenSearch: {genericError}")
self.logger.error(
f"{scriptName}:\tUnexpected exception while connecting to OpenSearch: {genericError}"
)

if (not connected) and args.opensearchWaitForHealth:
time.sleep(1)
Expand All @@ -144,14 +138,12 @@ def __init__(self):
# if requested, wait for at least "yellow" health in the cluster for the "files" index
while connected and args.opensearchWaitForHealth and (not healthy) and (not shuttingDown[0]):
try:
if debug:
eprint(f"{scriptName}:\twaiting for OpenSearch to be healthy")
self.logger.info(f"{scriptName}:\twaiting for OpenSearch to be healthy")
self.openSearchClient.cluster.health(
index=ARKIME_FILES_INDEX,
wait_for_status='yellow',
)
if verboseDebug:
eprint(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")
self.logger.debug(f"{scriptName}:\t{self.openSearchClient.cluster.health()}")
healthy = True

except (
Expand All @@ -160,8 +152,7 @@ def __init__(self):
ConnectionRefusedError,
NewConnectionError,
) as connError:
if verboseDebug:
eprint(f"{scriptName}:\tOpenSearch health check: {connError}")
self.logger.debug(f"{scriptName}:\tOpenSearch health check: {connError}")

if not healthy:
time.sleep(1)
Expand All @@ -172,27 +163,22 @@ def __init__(self):
self.context = zmq.Context()

# Socket to send messages on
if debug:
eprint(f"{scriptName}:\tbinding publisher port {PCAP_TOPIC_PORT}")
self.logger.info(f"{scriptName}:\tbinding publisher port {PCAP_TOPIC_PORT}")
self.topic_socket = self.context.socket(zmq.PUB)
self.topic_socket.bind(f"tcp://*:{PCAP_TOPIC_PORT}")

# todo: do I want to set this? probably not since this guy's whole job is to send
# and if he can't then what's the point? just block
# self.topic_socket.SNDTIMEO = 5000

if debug:
eprint(f"{scriptName}:\tEventWatcher initialized")
self.logger.info(f"{scriptName}:\tEventWatcher initialized")

###################################################################################################
# set up event processor to append processed events from to the event queue
def processFile(self, pathname):
global args
global debug
global verboseDebug

if debug:
eprint(f"{scriptName}:\t👓\t{pathname}")
self.logger.info(f"{scriptName}:\t👓\t{pathname}")

# the entity must be a regular PCAP file and actually exist
if os.path.isfile(pathname):
Expand Down Expand Up @@ -224,13 +210,11 @@ def processFile(self, pathname):

if fileIsDuplicate:
# this is duplicate file (it's been processed before) so ignore it
if debug:
eprint(f"{scriptName}:\t📋\t{pathname}")
self.logger.info(f"{scriptName}:\t📋\t{pathname}")

else:
# the entity is a right-sized non-duplicate file, and it exists, so send it to get processed
if debug:
eprint(f"{scriptName}:\t📩\t{pathname}")
self.logger.info(f"{scriptName}:\t📩\t{pathname}")
try:
fileInfo = {
FILE_INFO_DICT_NAME: pathname if args.includeAbsolutePath else relativePath,
Expand All @@ -241,16 +225,13 @@ def processFile(self, pathname):
FILE_INFO_DICT_TAGS: tags_from_filename(relativePath),
}
self.topic_socket.send_string(json.dumps(fileInfo))
if debug:
eprint(f"{scriptName}:\t📫\t{fileInfo}")
self.logger.info(f"{scriptName}:\t📫\t{fileInfo}")
except zmq.Again:
if verboseDebug:
eprint(f"{scriptName}:\t🕑\t{pathname}")
self.logger.debug(f"{scriptName}:\t🕑\t{pathname}")

else:
# too small/big to care about, or the wrong type, ignore it
if debug:
eprint(f"{scriptName}:\t\t{pathname}")
self.logger.info(f"{scriptName}:\t\t{pathname}")


def file_processor(pathname, **kwargs):
Expand All @@ -272,51 +253,16 @@ def pdb_handler(sig, frame):
pdbFlagged = True


###################################################################################################
# handle sigusr2 for toggling debug
def debug_toggle_handler(signum, frame):
global debug
global debugToggled
debug = not debug
debugToggled = True


###################################################################################################
# main
def main():
global args
global opensearchHttpAuth
global debug
global verboseDebug
global debugToggled
global pdbFlagged
global shuttingDown

parser = argparse.ArgumentParser(description=scriptName, add_help=False, usage='{} <arguments>'.format(scriptName))
parser.add_argument(
'-v',
'--verbose',
dest='debug',
help="Verbose output",
metavar='true|false',
type=str2bool,
nargs='?',
const=True,
default=False,
required=False,
)
parser.add_argument(
'--extra-verbose',
dest='verboseDebug',
help="Super verbose output",
metavar='true|false',
type=str2bool,
nargs='?',
const=True,
default=False,
required=False,
)

parser.add_argument('--verbose', '-v', action='count', default=1, help='Increase verbosity (e.g., -v, -vv, etc.)')
parser.add_argument(
'--min-bytes',
dest='minBytes',
Expand Down Expand Up @@ -465,17 +411,16 @@ def main():
parser.print_help()
exit(2)

verboseDebug = args.verboseDebug
debug = args.debug or verboseDebug
if debug:
eprint(os.path.join(scriptPath, scriptName))
eprint("{} arguments: {}".format(scriptName, sys.argv[1:]))
eprint("{} arguments: {}".format(scriptName, args))
else:
args.verbose = logging.ERROR - (10 * args.verbose) if args.verbose > 0 else 0
logging.basicConfig(
level=args.verbose, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info(os.path.join(scriptPath, scriptName))
logging.info("Arguments: {}".format(sys.argv[1:]))
logging.info("Arguments: {}".format(args))
if args.verbose > logging.DEBUG:
sys.tracebacklimit = 0

logging.basicConfig(level=logging.ERROR)

args.opensearchIsLocal = args.opensearchIsLocal or (args.opensearchUrl == 'http://opensearch:9200')
opensearchCreds = (
ParseCurlFile(args.opensearchCurlRcFile) if (not args.opensearchIsLocal) else defaultdict(lambda: None)
Expand All @@ -493,7 +438,6 @@ def main():
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGUSR1, pdb_handler)
signal.signal(signal.SIGUSR2, debug_toggle_handler)

# sleep for a bit if requested
sleepCount = 0
Expand All @@ -506,8 +450,7 @@ def main():
preexistingDir = True
else:
preexistingDir = False
if debug:
eprint(f'{scriptName}:\tcreating "{args.baseDir}" to monitor')
logging.info(f'{scriptName}:\tcreating "{args.baseDir}" to monitor')
pathlib.Path(args.baseDir).mkdir(parents=False, exist_ok=True)

# if recursion was requested, get list of directories to monitor
Expand All @@ -527,14 +470,12 @@ def main():
polling=args.polling,
)
for watchDir in watchDirs:
if verboseDebug:
eprint(f"{scriptName}:\tScheduling {watchDir}")
logging.debug(f"{scriptName}:\tScheduling {watchDir}")
observer.schedule(handler, watchDir, recursive=False)

observer.start()

if debug:
eprint(f"{scriptName}:\tmonitoring {watchDirs}")
logging.info(f"{scriptName}:\tmonitoring {watchDirs}")

try:
time.sleep(2)
Expand All @@ -548,8 +489,8 @@ def main():
]:
touch(preexistingFile)
filesTouched += 1
if debug and (filesTouched > 0):
eprint(f"{scriptName}:\tfound {filesTouched} preexisting files to check")
if filesTouched > 0:
logging.info(f"{scriptName}:\tfound {filesTouched} preexisting files to check")

# start the thread to actually handle the files as they're queued by the FileOperationEventHandler handler
workerThreadCount = malcolm_utils.AtomicInt(value=0)
Expand All @@ -560,11 +501,11 @@ def main():
handler,
observer,
file_processor,
{'watcher': EventWatcher()},
{'watcher': EventWatcher(logger=logging)},
args.assumeClosedSec,
workerThreadCount,
shuttingDown,
None,
logging,
],
),
)
Expand All @@ -577,8 +518,7 @@ def main():
observer.join(1)

# graceful shutdown
if debug:
eprint(f"{scriptName}:\tshutting down...")
logging.info(f"{scriptName}:\tshutting down...")

if shuttingDown[0]:
raise WatchdogShutdown()
Expand All @@ -594,8 +534,7 @@ def main():
while workerThreadCount.value() > 0:
time.sleep(1)

if debug:
eprint(f"{scriptName}:\tfinished monitoring {watchDirs}")
logging.info(f"{scriptName}:\tfinished monitoring {watchDirs}")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit be05fab

Please sign in to comment.