Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 77 additions & 53 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ def from_dict(cls, data: dict):
@dataclasses.dataclass(eq=True)
@functools.total_ordering
class PatchData:
patch_url: Optional[str] = None
patch_text: Optional[str] = None
patch_url: Optional[str] = ""
patch_text: Optional[str] = ""
patch_checksum: Optional[str] = dataclasses.field(init=False, default=None)

def __post_init__(self):
Expand All @@ -271,9 +271,9 @@ def __lt__(self, other):

def _cmp_key(self):
return (
self.patch_url,
self.patch_text,
self.patch_checksum,
self.patch_url or "",
self.patch_text or "",
self.patch_checksum or "",
)

def to_dict(self) -> dict:
Expand Down Expand Up @@ -556,23 +556,63 @@ def from_dict(cls, affected_pkg: dict):
class AdvisoryData:
"""
This data class expresses the contract between data sources and the import runner.
"""

aliases: List[str] = dataclasses.field(default_factory=list)
summary: Optional[str] = ""
affected_packages: List[AffectedPackage] = dataclasses.field(default_factory=list)
references: List[Reference] = dataclasses.field(default_factory=list)
date_published: Optional[datetime.datetime] = None
weaknesses: List[int] = dataclasses.field(default_factory=list)
url: Optional[str] = None

def __post_init__(self):
if self.summary:
self.summary = clean_summary(self.summary)

If a vulnerability_id is present then:
summary or affected_packages or references must be present
otherwise
either affected_package or references should be present
def to_dict(self):
return {
"aliases": self.aliases,
"summary": self.summary,
"affected_packages": [pkg.to_dict() for pkg in self.affected_packages],
"references": [ref.to_dict() for ref in self.references],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"url": self.url if self.url else "",
}

date_published must be aware datetime
@classmethod
def from_dict(cls, advisory_data):
date_published = advisory_data["date_published"]
transformed = {
"aliases": advisory_data["aliases"],
"summary": advisory_data["summary"],
"affected_packages": [
AffectedPackage.from_dict(pkg)
for pkg in advisory_data["affected_packages"]
if pkg is not None
],
"references": [Reference.from_dict(ref) for ref in advisory_data["references"]],
"date_published": datetime.datetime.fromisoformat(date_published)
if date_published
else None,
"weaknesses": advisory_data["weaknesses"],
"url": advisory_data.get("url") or None,
}
return cls(**transformed)


@dataclasses.dataclass(order=True)
class AdvisoryDataV2:
"""
This data class expresses the contract between data sources and the import runner.
"""

advisory_id: str = ""
aliases: List[str] = dataclasses.field(default_factory=list)
summary: Optional[str] = ""
affected_packages: Union[List[AffectedPackage], List[AffectedPackageV2]] = dataclasses.field(
default_factory=list
)
references: List[Reference] = dataclasses.field(default_factory=list)
references_v2: List[ReferenceV2] = dataclasses.field(default_factory=list)
affected_packages: List[AffectedPackageV2] = dataclasses.field(default_factory=list)
references: List[ReferenceV2] = dataclasses.field(default_factory=list)
patches: List[PatchData] = dataclasses.field(default_factory=list)
date_published: Optional[datetime.datetime] = None
weaknesses: List[int] = dataclasses.field(default_factory=list)
Expand All @@ -581,46 +621,24 @@ class AdvisoryData:
original_advisory_text: Optional[str] = None

def __post_init__(self):
if not self.advisory_id:
raise ValueError("advisory_id is required for AdvisoryDataV2")
if self.advisory_id and self.advisory_id in self.aliases:
raise ValueError(
f"advisory_id {self.advisory_id} should not be present in aliases {self.aliases}"
)
if self.summary:
self.summary = self.clean_summary(self.summary)

def clean_summary(self, summary):
# https://nvd.nist.gov/vuln/detail/CVE-2013-4314
# https://github.com/cms-dev/cms/issues/888#issuecomment-516977572
summary = summary.strip()
if summary:
summary = summary.replace("\x00", "\uFFFD")
return summary
self.summary = clean_summary(self.summary)

def to_dict(self):
is_adv_v2 = (
self.advisory_id
or self.severities
or self.references_v2
or (self.affected_packages and isinstance(self.affected_packages[0], AffectedPackageV2))
)
if is_adv_v2:
return {
"advisory_id": self.advisory_id,
"aliases": self.aliases,
"summary": self.summary,
"affected_packages": [pkg.to_dict() for pkg in self.affected_packages],
"references_v2": [ref.to_dict() for ref in self.references_v2],
"patches": [patch.to_dict() for patch in self.patches],
"severities": [sev.to_dict() for sev in self.severities],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"url": self.url if self.url else "",
}
return {
"advisory_id": self.advisory_id,
"aliases": self.aliases,
"summary": self.summary,
"affected_packages": [pkg.to_dict() for pkg in self.affected_packages],
"references": [ref.to_dict() for ref in self.references],
"patches": [patch.to_dict() for patch in self.patches],
"severities": [sev.to_dict() for sev in self.severities],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"url": self.url if self.url else "",
Expand All @@ -629,31 +647,37 @@ def to_dict(self):
@classmethod
def from_dict(cls, advisory_data):
date_published = advisory_data["date_published"]
affected_packages = advisory_data["affected_packages"]
affected_package_cls = AffectedPackage
if affected_packages:
affected_package_cls = (
AffectedPackageV2
if "fixed_version_range" in affected_packages[0]
else AffectedPackage
)
transformed = {
"aliases": advisory_data["aliases"],
"summary": advisory_data["summary"],
"affected_packages": [
affected_package_cls.from_dict(pkg) for pkg in affected_packages if pkg is not None
AffectedPackageV2.from_dict(pkg)
for pkg in advisory_data["affected_packages"]
if pkg is not None
],
"patches": [PatchData.from_dict(patch) for patch in advisory_data.get("patches", [])],
"references": [Reference.from_dict(ref) for ref in advisory_data["references"]],
"references": [ReferenceV2.from_dict(ref) for ref in advisory_data["references"]],
"date_published": datetime.datetime.fromisoformat(date_published)
if date_published
else None,
"weaknesses": advisory_data["weaknesses"],
"severities": [
VulnerabilitySeverity.from_dict(sev) for sev in advisory_data.get("severities", [])
],
"url": advisory_data.get("url") or None,
}
return cls(**transformed)


def clean_summary(summary):
# https://nvd.nist.gov/vuln/detail/CVE-2013-4314
# https://github.com/cms-dev/cms/issues/888#issuecomment-516977572
summary = summary.strip()
if summary:
summary = summary.replace("\x00", "\uFFFD")
return summary


class NoLicenseError(Exception):
pass

Expand Down
2 changes: 1 addition & 1 deletion vulnerabilities/importers/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def parse_advisory_data(raw_data) -> AdvisoryData:
... ]
... }
>>> parse_advisory_data(raw_data)
AdvisoryData(advisory_id='', aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', reference_type='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=Cvssv3ScoringSystem(identifier='cvssv3.1', name='CVSSv3.1 Base Score', url='https://www.first.org/cvss/v3-1/', notes='CVSSv3.1 base score and vector'), value='Low', scoring_elements='', published_at=None, url=None)]), Reference(reference_id='', reference_type='', url='https://hackerone.com/reports/2410774', severities=[])], references_v2=[], patches=[], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], severities=[], url='https://curl.se/docs/CVE-2024-2379.json', original_advisory_text=None)
AdvisoryData(aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', reference_type='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=Cvssv3ScoringSystem(identifier='cvssv3.1', name='CVSSv3.1 Base Score', url='https://www.first.org/cvss/v3-1/', notes='CVSSv3.1 base score and vector'), value='Low', scoring_elements='', published_at=None, url=None)]), Reference(reference_id='', reference_type='', url='https://hackerone.com/reports/2410774', severities=[])], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], url='https://curl.se/docs/CVE-2024-2379.json')
"""

affected = get_item(raw_data, "affected")[0] if len(get_item(raw_data, "affected")) > 0 else []
Expand Down
77 changes: 0 additions & 77 deletions vulnerabilities/importers/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,83 +111,6 @@ def parse_advisory_data(
)


def parse_advisory_data_v2(
raw_data: dict, supported_ecosystems, advisory_url: str, advisory_text: str
) -> Optional[AdvisoryData]:
"""
Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
a ``supported_ecosystem`` string.
"""
advisory_id = raw_data.get("id") or ""
if not advisory_id:
logger.error(f"Missing advisory id in OSV data: {raw_data}")
return None
summary = raw_data.get("summary") or ""
details = raw_data.get("details") or ""
summary = build_description(summary=summary, description=details)
aliases = raw_data.get("aliases") or []

date_published = get_published_date(raw_data=raw_data)
severities = list(get_severities(raw_data=raw_data))
references = get_references_v2(raw_data=raw_data)

affected_packages = []

for affected_pkg in raw_data.get("affected") or []:
purl = get_affected_purl(affected_pkg=affected_pkg, raw_id=advisory_id)

if not purl or purl.type not in supported_ecosystems:
logger.error(f"Unsupported package type: {affected_pkg!r} in OSV: {advisory_id!r}")
continue

affected_version_range = get_affected_version_range(
affected_pkg=affected_pkg,
raw_id=advisory_id,
supported_ecosystem=purl.type,
)

fixed_versions = []
fixed_version_range = None
for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=advisory_id, supported_ecosystem=purl.type
)
fixed_versions.extend([v.string for v in fixed_version])

fixed_version_range = (
get_fixed_version_range(fixed_versions, purl.type) if fixed_versions else None
)

if fixed_version_range or affected_version_range:
affected_packages.append(
AffectedPackageV2(
package=purl,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
)

database_specific = raw_data.get("database_specific") or {}
cwe_ids = database_specific.get("cwe_ids") or []
weaknesses = list(map(get_cwe_id, cwe_ids))

if advisory_id in aliases:
aliases.remove(advisory_id)

return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
severities=severities,
affected_packages=affected_packages,
date_published=date_published,
weaknesses=weaknesses,
url=advisory_url,
original_advisory_text=advisory_text or json.dumps(raw_data, indent=2, ensure_ascii=False),
)


def extract_fixed_versions(fixed_range) -> Iterable[str]:
"""
Return a list of fixed version strings given a ``fixed_range`` mapping of
Expand Down
15 changes: 11 additions & 4 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import vulnerablecode
from vulnerabilities import utils
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import compute_patch_checksum
Expand Down Expand Up @@ -2987,6 +2988,12 @@ class AdvisoryV2(models.Model):
help_text="Weighted severity is the highest value calculated by multiplying each severity by its corresponding weight, divided by 10.",
)

# precedence = models.IntegerField(
# null=True,
# blank=True,
# help_text="Precedence indicates the priority level of addressing a vulnerability based on its overall risk",
# )

@property
def risk_score(self):
"""
Expand Down Expand Up @@ -3026,17 +3033,17 @@ def get_absolute_url(self):
"""
return reverse("advisory_details", args=[self.avid])

def to_advisory_data(self) -> "AdvisoryData":
from vulnerabilities.importer import AdvisoryData
def to_advisory_data(self) -> "AdvisoryDataV2":
from vulnerabilities.importer import AdvisoryDataV2

return AdvisoryData(
return AdvisoryDataV2(
advisory_id=self.advisory_id,
aliases=[item.alias for item in self.aliases.all()],
summary=self.summary,
affected_packages=[
impacted.to_affected_package_data() for impacted in self.impacted_packages.all()
],
references_v2=[ref.to_reference_v2_data() for ref in self.references.all()],
references=[ref.to_reference_v2_data() for ref in self.references.all()],
patches=[patch.to_patch_data() for patch in self.patches.all()],
date_published=self.date_published,
weaknesses=[weak.cwe_id for weak in self.weaknesses.all()],
Expand Down
6 changes: 3 additions & 3 deletions vulnerabilities/pipelines/v2_importers/aosp_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import dateparser
from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipes.advisory import append_patch_classifications
Expand Down Expand Up @@ -100,13 +100,13 @@ def collect_advisories(self):
f"{quote(file_path.name)}"
)

yield AdvisoryData(
yield AdvisoryDataV2(
advisory_id=vulnerability_id,
summary=summary,
affected_packages=affected_packages,
severities=severities,
patches=patches,
references_v2=references,
references=references,
date_published=date_published,
url=url,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from univers.version_range import ApacheVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
Expand Down Expand Up @@ -223,7 +223,7 @@ class ApacheHTTPDImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
def steps(cls):
return (cls.collect_and_store_advisories,)

def collect_advisories(self) -> Iterable[AdvisoryData]:
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
if not self.links:
self.links = fetch_links(self.base_url)
for link in self.links:
Expand Down Expand Up @@ -301,12 +301,12 @@ def to_advisory(self, data):

weaknesses = get_weaknesses(data)

return AdvisoryData(
return AdvisoryDataV2(
advisory_id=alias,
aliases=[],
summary=description or "",
affected_packages=affected_packages,
references_v2=[reference],
references=[reference],
weaknesses=weaknesses,
url=reference.url,
severities=severities,
Expand Down
Loading
Loading