Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring OSS Item classes #121

Merged
merged 7 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ py-tlsh
pytz
XlsxWriter
PyYAML
fosslight_util~=1.4.47
fosslight_util>=2.0.0
dependency-check
188 changes: 61 additions & 127 deletions src/fosslight_binary/_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,10 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 LG Electronics Inc.
# SPDX-License-Identifier: Apache-2.0
import hashlib
import tlsh
from io import open
from fosslight_util.oss_item import FileItem

_EXCLUDE_TRUE_VALUE = "Exclude"
_TLSH_CHECKSUM_NULL = "0"


class OssItem:
name = ""
version = ""
license = ""
dl_url = ""
comment = ""
exclude = False

def __init__(self, name, version, license, dl_url=""):
self.name = name
self.version = version
self.license = license
self.dl_url = dl_url
self.exclude = False
self.comment = ""

def set_comment(self, value):
if self.comment:
self.comment = f"{self.comment} / {value}"
else:
self.comment = value

def set_exclude(self, value):
self.exclude = value

def get_comment(self):
return self.comment
EXCLUDE_TRUE_VALUE = "Exclude"
TLSH_CHECKSUM_NULL = "0"


class VulnerabilityItem:
Expand All @@ -50,119 +19,84 @@ def __init__(self, file_path, id, url):
self.nvd_url = url


class BinaryItem:
bin_name = ""
binary_name_without_path = ""
binary_strip_root = "" # Value of binary name column
tlsh = _TLSH_CHECKSUM_NULL
checksum = _TLSH_CHECKSUM_NULL
oss_items = []
vulnerability_items = []
exclude = False
comment = ""
found_in_owasp = False

class BinaryItem(FileItem):
def __init__(self, value):
super().__init__("")
self.exclude = False
self.binary_strip_root = ""
self.checksum = _TLSH_CHECKSUM_NULL
self.tlsh = _TLSH_CHECKSUM_NULL
self.oss_items = []
self.source_name_or_path = ""
self.checksum = TLSH_CHECKSUM_NULL
self.tlsh = TLSH_CHECKSUM_NULL
self.vulnerability_items = []
self.binary_name_without_path = ""
self.set_bin_name(value)
self.bin_name_with_path = value
self.found_in_owasp = False
self.is_binary = True

def __del__(self):
pass

def set_oss_items(self, new_oss_list, exclude=False, exclude_msg=""):
if exclude:
for oss in new_oss_list:
oss.set_exclude(True)
oss.set_comment(exclude_msg)
oss.exclude = True
oss.comment = exclude_msg
# Append New input OSS
self.oss_items.extend(new_oss_list)

def get_oss_items(self):
return self.oss_items

def set_vulnerability_items(self, vul_list):
if vul_list is not None:
self.vulnerability_items.extend(vul_list)

def get_vulnerability_items(self):
nvd_url = [vul_item.nvd_url for vul_item in self.vulnerability_items]
return ", ".join(nvd_url)

def set_comment(self, value):
if self.comment:
self.comment = f"{self.comment} / {value}"
else:
self.comment = value

def set_bin_name(self, value):
self.bin_name = value

def set_exclude(self, value):
self.exclude = value

def set_checksum(self, value):
self.checksum = value

def set_tlsh(self, value):
self.tlsh = value

def get_comment(self):
return self.comment

def get_print_binary_only(self):
return (self.binary_strip_root + "\t" + self.checksum + "\t" + self.tlsh)
return (self.source_name_or_path + "\t" + self.checksum + "\t" + self.tlsh)

def get_oss_report(self):
comment = ""
if len(self.oss_items) > 0:
def get_print_array(self):
items = []
if self.oss_items:
for oss in self.oss_items:
exclude = _EXCLUDE_TRUE_VALUE if (self.exclude or oss.exclude) else ""
lic = ",".join(oss.license)
exclude = EXCLUDE_TRUE_VALUE if (self.exclude or oss.exclude) else ""
nvd_url = self.get_vulnerability_items()

if self.comment:
if oss.comment:
comment = f"{self.comment} / {oss.comment}"
else:
comment = self.comment
else:
comment = oss.comment

yield [self.binary_strip_root, oss.name, oss.version,
oss.license, oss.dl_url, '', '', exclude, comment,
nvd_url, self.tlsh, self.checksum]
items.append([self.source_name_or_path, oss.name, oss.version,
lic, oss.download_location, oss.homepage,
oss.copyright, exclude, oss.comment,
nvd_url, self.tlsh, self.checksum])
else:
exclude = _EXCLUDE_TRUE_VALUE if self.exclude else ""
yield [self.binary_strip_root, '',
'', '', '', '', '', exclude, self.comment, '', self.tlsh, self.checksum]

def set_checksum_tlsh(self):
self.checksum, self.tlsh, error, msg = get_checksum_and_tlsh(
self.bin_name)
return error, msg


def get_checksum_and_tlsh(bin_with_path):
checksum_value = _TLSH_CHECKSUM_NULL
tlsh_value = _TLSH_CHECKSUM_NULL
error_msg = ""
error = False
try:
f = open(bin_with_path, "rb")
byte = f.read()
sha1_hash = hashlib.sha1(byte)
checksum_value = str(sha1_hash.hexdigest())
try:
tlsh_value = str(tlsh.hash(byte))
except:
tlsh_value = _TLSH_CHECKSUM_NULL
f.close()
except Exception as ex:
error_msg = f"(Error) Get_checksum, tlsh: {ex}"
error = True
return checksum_value, tlsh_value, error, error_msg
exclude = EXCLUDE_TRUE_VALUE if self.exclude else ""
items.append([self.source_name_or_path, '',
'', '', '', '', '', exclude, self.comment, '',
self.tlsh, self.checksum])
return items

def get_print_json(self):
items = []
if self.oss_items:
for oss in self.oss_items:
json_item = {}
json_item["name"] = oss.name
json_item["version"] = oss.version

if self.source_name_or_path:
json_item["source path"] = self.source_name_or_path
if len(oss.license) > 0:
json_item["license"] = oss.license
if oss.download_location:
json_item["download location"] = oss.download_location
if oss.homepage:
json_item["homepage"] = oss.homepage
if oss.copyright:
json_item["copyright text"] = oss.copyright
if self.exclude or oss.exclude:
json_item["exclude"] = True
if oss.comment:
json_item["comment"] = oss.comment
items.append(json_item)
else:
json_item = {}
if self.source_name_or_path:
json_item["source path"] = self.source_name_or_path
if self.exclude:
json_item["exclude"] = True
if self.comment:
json_item["comment"] = self.comment
return items
9 changes: 5 additions & 4 deletions src/fosslight_binary/_binary_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import psycopg2
import pandas as pd
from urllib.parse import urlparse
from ._binary import _TLSH_CHECKSUM_NULL, OssItem
from ._binary import TLSH_CHECKSUM_NULL
from fosslight_util.oss_item import OssItem
import fosslight_util.constant as constant

columns = ['filename', 'pathname', 'checksum', 'tlshchecksum', 'ossname',
Expand Down Expand Up @@ -43,10 +44,10 @@ def get_oss_info_from_db(bin_info_list, dburl=""):
if not item.found_in_owasp:
oss_from_db = OssItem(row['ossname'], row['ossversion'], row['license'])
bin_oss_items.append(oss_from_db)
item.set_comment("Binary DB result")

if bin_oss_items:
item.set_oss_items(bin_oss_items)
item.comment = "Binary DB result"

disconnect_lge_bin_db()
return bin_info_list, _cnt_auto_identified
Expand Down Expand Up @@ -97,14 +98,14 @@ def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value):
sql_statement_filename, ['tlshchecksum'])
if df_result is None or len(df_result) <= 0:
final_result_item = ""
elif tlsh_value == _TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file.
elif tlsh_value == TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file.
final_result_item = ""
else:
matched_tlsh = ""
matched_tlsh_diff = -1
for row in df_result.tlshchecksum:
try:
if row != _TLSH_CHECKSUM_NULL:
if row != TLSH_CHECKSUM_NULL:
tlsh_diff = tlsh.diff(row, tlsh_value)
if tlsh_diff <= 120: # MATCHED
if (matched_tlsh_diff < 0) or (tlsh_diff < matched_tlsh_diff):
Expand Down
13 changes: 7 additions & 6 deletions src/fosslight_binary/_jar_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import os
import sys
import fosslight_util.constant as constant
from ._binary import BinaryItem, OssItem, VulnerabilityItem
from ._binary import BinaryItem, VulnerabilityItem
from fosslight_util.oss_item import OssItem
from dependency_check import run as dependency_check_run


Expand Down Expand Up @@ -63,21 +64,21 @@ def merge_binary_list(owasp_items, vulnerability_items, bin_list):
for key, value in owasp_items.items():
found = False
for bin in bin_list:
if bin.binary_strip_root == key:
if bin.source_name_or_path == key:
for oss in value:
if oss.name and oss.license:
bin.found_in_owasp = True
break
bin.set_oss_items(value)
if vulnerability_items is not None:
bin.set_vulnerability_items(vulnerability_items.get(key))
if vulnerability_items and vulnerability_items.get(key):
bin.vulnerability_items.extend(vulnerability_items.get(key))
found = True
break

if not found:
bin_item = BinaryItem(os.path.abspath(key))
bin_item.binary_name_without_path = os.path.basename(key)
bin_item.binary_strip_root = key
bin_item.source_name_or_path = key
bin_item.set_oss_items(value)
not_found_bin.append(bin_item)

Expand Down Expand Up @@ -261,7 +262,7 @@ def analyze_jar_file(path_to_find_bin, path_to_exclude):

if oss_name != "" or oss_ver != "" or oss_license != "" or oss_dl_url != "":
oss = OssItem(oss_name, oss_ver, oss_license, oss_dl_url)
oss.set_comment("OWASP result")
oss.comment = "OWASP result"

remove_owasp_item = owasp_items.get(file_with_path)
if remove_owasp_item:
Expand Down
Loading
Loading