Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring OSS Item classes #121

Merged
merged 7 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 51 additions & 118 deletions src/fosslight_binary/_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,10 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020 LG Electronics Inc.
# SPDX-License-Identifier: Apache-2.0
import hashlib
import tlsh
from io import open
from fosslight_util.oss_item import FileItem

_EXCLUDE_TRUE_VALUE = "Exclude"
_TLSH_CHECKSUM_NULL = "0"


class OssItem:
name = ""
version = ""
license = ""
dl_url = ""
comment = ""
exclude = False

def __init__(self, name, version, license, dl_url=""):
self.name = name
self.version = version
self.license = license
self.dl_url = dl_url
self.exclude = False
self.comment = ""

def set_comment(self, value):
if self.comment:
self.comment = f"{self.comment} / {value}"
else:
self.comment = value

def set_exclude(self, value):
self.exclude = value

def get_comment(self):
return self.comment
EXCLUDE_TRUE_VALUE = "Exclude"
TLSH_CHECKSUM_NULL = "0"


class VulnerabilityItem:
Expand All @@ -50,79 +19,43 @@ def __init__(self, file_path, id, url):
self.nvd_url = url


class BinaryItem:
bin_name = ""
binary_name_without_path = ""
binary_strip_root = "" # Value of binary name column
tlsh = _TLSH_CHECKSUM_NULL
checksum = _TLSH_CHECKSUM_NULL
oss_items = []
vulnerability_items = []
exclude = False
comment = ""
found_in_owasp = False

class BinaryItem(FileItem):
def __init__(self, value):
super().__init__("")
self.exclude = False
self.binary_strip_root = ""
self.checksum = _TLSH_CHECKSUM_NULL
self.tlsh = _TLSH_CHECKSUM_NULL
self.oss_items = []
self.source_name_or_path = ""
self.checksum = TLSH_CHECKSUM_NULL
self.tlsh = TLSH_CHECKSUM_NULL
self.vulnerability_items = []
self.binary_name_without_path = ""
self.set_bin_name(value)
self.bin_name_with_path = value
self.found_in_owasp = False

def __del__(self):
pass

def set_oss_items(self, new_oss_list, exclude=False, exclude_msg=""):
if exclude:
for oss in new_oss_list:
oss.set_exclude(True)
oss.set_comment(exclude_msg)
oss.exclude = True
oss.comment = exclude_msg
# Append New input OSS
self.oss_items.extend(new_oss_list)

def get_oss_items(self):
return self.oss_items

def set_vulnerability_items(self, vul_list):
if vul_list is not None:
self.vulnerability_items.extend(vul_list)


def get_vulnerability_items(self):
nvd_url = [vul_item.nvd_url for vul_item in self.vulnerability_items]
return ", ".join(nvd_url)

def set_comment(self, value):
if self.comment:
self.comment = f"{self.comment} / {value}"
else:
self.comment = value

def set_bin_name(self, value):
self.bin_name = value

def set_exclude(self, value):
self.exclude = value

def set_checksum(self, value):
self.checksum = value

def set_tlsh(self, value):
self.tlsh = value

def get_comment(self):
return self.comment

def get_print_binary_only(self):
return (self.binary_strip_root + "\t" + self.checksum + "\t" + self.tlsh)
return (self.source_name_or_path + "\t" + self.checksum + "\t" + self.tlsh)

def get_oss_report(self):
def get_print_array(self):
items = []
comment = ""
if len(self.oss_items) > 0:
if self.oss_items:
for oss in self.oss_items:
exclude = _EXCLUDE_TRUE_VALUE if (self.exclude or oss.exclude) else ""
lic = ",".join(oss.license)
exclude = EXCLUDE_TRUE_VALUE if (self.exclude or oss.exclude) else ""
nvd_url = self.get_vulnerability_items()

if self.comment:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soimkim
FileItem에서 comment set할 때, 각 OssItem의 comment 추가될 수 있도록 set함수 구성되어 있습니다.
그냥 comment만 부르도록 수정해주시기 바랍니다.

Copy link
Contributor Author

@soimkim soimkim Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dd-jy , FL Util에서 하기 부분이 비효율적인 것으로 보입니다.
또한 oss_item별로 comment가 다를 수 있는데 하기 메세지로 인해 self.oss_items의 커맨트가 초기화되고, oss_item별로 커맨트를 따로 가져갈 수 없습니다.
for oss in self.oss_items:
oss.comment = value

이에 FL Util에서 하기 사항 수정을 제안드립니다.

  1. FileItem의 comment set할 때, oss_item의 커맨트 정의 빼기
  2. get_print_array에서FileItem의 커맨트와 oss_item의 커맨트 합쳐서 출력 (중복은 제거)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soimkim
oss_item에서 comment를 부를때, file_item의 comment를 부를 수 없어서 oss_item의 comment를 부를때와 file_item의 comment 부를 때 comment가 달라지는게 별로라 생각해서 oss_item으로 무조건 set하도록 했었는데요.
말씀하신대로 file_item의 comment를 추가하면 comment가 추가된 순서로 출력되지 않아서, correct 모드 같은 경우, 보기가 불편한 것 같습니다.

Expand All @@ -133,36 +66,36 @@ def get_oss_report(self):
else:
comment = oss.comment

yield [self.binary_strip_root, oss.name, oss.version,
oss.license, oss.dl_url, '', '', exclude, comment,
nvd_url, self.tlsh, self.checksum]
items.append([self.source_name_or_path, oss.name, oss.version,
lic, oss.download_location, '', '', exclude, comment,
nvd_url, self.tlsh, self.checksum])
else:
exclude = _EXCLUDE_TRUE_VALUE if self.exclude else ""
yield [self.binary_strip_root, '',
'', '', '', '', '', exclude, self.comment, '', self.tlsh, self.checksum]

def set_checksum_tlsh(self):
self.checksum, self.tlsh, error, msg = get_checksum_and_tlsh(
self.bin_name)
return error, msg


def get_checksum_and_tlsh(bin_with_path):
checksum_value = _TLSH_CHECKSUM_NULL
tlsh_value = _TLSH_CHECKSUM_NULL
error_msg = ""
error = False
try:
f = open(bin_with_path, "rb")
byte = f.read()
sha1_hash = hashlib.sha1(byte)
checksum_value = str(sha1_hash.hexdigest())
try:
tlsh_value = str(tlsh.hash(byte))
except:
tlsh_value = _TLSH_CHECKSUM_NULL
f.close()
except Exception as ex:
error_msg = f"(Error) Get_checksum, tlsh: {ex}"
error = True
return checksum_value, tlsh_value, error, error_msg
exclude = EXCLUDE_TRUE_VALUE if self.exclude else ""
items.append([self.source_name_or_path, '',
'', '', '', '', '', exclude, self.comment, '', self.tlsh, self.checksum])
return items

def get_print_json(self):
items = []

for oss in self.oss_items:
json_item = {}
json_item["name"] = oss.name
json_item["version"] = oss.version

if self.source_name_or_path:
json_item["source path"] = self.source_name_or_path
if len(oss.license) > 0:
json_item["license"] = oss.license
if oss.download_location:
json_item["download location"] = oss.download_location
if oss.homepage:
json_item["homepage"] = oss.homepage
if oss.copyright:
json_item["copyright text"] = oss.copyright
if self.exclude or oss.exclude:
json_item["exclude"] = True
if oss.comment:
json_item["comment"] = oss.comment
items.append(json_item)
return items
9 changes: 5 additions & 4 deletions src/fosslight_binary/_binary_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import psycopg2
import pandas as pd
from urllib.parse import urlparse
from ._binary import _TLSH_CHECKSUM_NULL, OssItem
from ._binary import TLSH_CHECKSUM_NULL
from fosslight_util.oss_item import OssItem
import fosslight_util.constant as constant

columns = ['filename', 'pathname', 'checksum', 'tlshchecksum', 'ossname',
Expand Down Expand Up @@ -43,7 +44,7 @@ def get_oss_info_from_db(bin_info_list, dburl=""):
if not item.found_in_owasp:
oss_from_db = OssItem(row['ossname'], row['ossversion'], row['license'])
bin_oss_items.append(oss_from_db)
item.set_comment("Binary DB result")
item.comment = "Binary DB result"

if bin_oss_items:
item.set_oss_items(bin_oss_items)
Expand Down Expand Up @@ -97,14 +98,14 @@ def get_oss_info_by_tlsh_and_filename(file_name, checksum_value, tlsh_value):
sql_statement_filename, ['tlshchecksum'])
if df_result is None or len(df_result) <= 0:
final_result_item = ""
elif tlsh_value == _TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file.
elif tlsh_value == TLSH_CHECKSUM_NULL: # Couldn't get the tlsh of a file.
final_result_item = ""
else:
matched_tlsh = ""
matched_tlsh_diff = -1
for row in df_result.tlshchecksum:
try:
if row != _TLSH_CHECKSUM_NULL:
if row != TLSH_CHECKSUM_NULL:
tlsh_diff = tlsh.diff(row, tlsh_value)
if tlsh_diff <= 120: # MATCHED
if (matched_tlsh_diff < 0) or (tlsh_diff < matched_tlsh_diff):
Expand Down
13 changes: 7 additions & 6 deletions src/fosslight_binary/_jar_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import os
import sys
import fosslight_util.constant as constant
from ._binary import BinaryItem, OssItem, VulnerabilityItem
from ._binary import BinaryItem, VulnerabilityItem
from fosslight_util.oss_item import OssItem
from dependency_check import run as dependency_check_run


Expand Down Expand Up @@ -63,21 +64,21 @@ def merge_binary_list(owasp_items, vulnerability_items, bin_list):
for key, value in owasp_items.items():
found = False
for bin in bin_list:
if bin.binary_strip_root == key:
if bin.source_name_or_path == key:
for oss in value:
if oss.name and oss.license:
bin.found_in_owasp = True
break
bin.set_oss_items(value)
if vulnerability_items is not None:
bin.set_vulnerability_items(vulnerability_items.get(key))
if vulnerability_items and vulnerability_items.get(key):
bin.vulnerability_items.extend(vulnerability_items.get(key))
found = True
break

if not found:
bin_item = BinaryItem(os.path.abspath(key))
bin_item.binary_name_without_path = os.path.basename(key)
bin_item.binary_strip_root = key
bin_item.source_name_or_path = key
bin_item.set_oss_items(value)
not_found_bin.append(bin_item)

Expand Down Expand Up @@ -261,7 +262,7 @@ def analyze_jar_file(path_to_find_bin, path_to_exclude):

if oss_name != "" or oss_ver != "" or oss_license != "" or oss_dl_url != "":
oss = OssItem(oss_name, oss_ver, oss_license, oss_dl_url)
oss.set_comment("OWASP result")
oss.comment = "OWASP result"

remove_owasp_item = owasp_items.get(file_with_path)
if remove_owasp_item:
Expand Down
Loading
Loading