Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions minecode_pipelines/pipelines/mine_alpine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

from scanpipe.pipes import federatedcode

from minecode_pipelines import pipes
from minecode_pipelines.pipelines import MineCodeBasePipeline

from minecode_pipelines.pipes import alpine
Expand All @@ -28,21 +31,61 @@
class MineAlpine(MineCodeBasePipeline):
"""Mine PackageURLs from alpine index and publish them to FederatedCode."""

pipeline_config_repo = "https://github.com/aboutcode-data/minecode-pipelines-config/"
checkpoint_path = "alpine/checkpoints.json"

@classmethod
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
cls.fetch_federation_config,
cls.fetch_checkpoint,
cls.mine_and_publish_alpine_packageurls,
cls.save_checkpoint,
cls.delete_working_dir,
)

def fetch_checkpoint(self):
"""Fetch the list of already-processed Alpine index URLs."""
self.checkpoint_config_repo = federatedcode.clone_repository(
repo_url=self.pipeline_config_repo,
clone_path=self.working_path / "minecode-pipelines-config",
logger=self.log,
)
checkpoint = pipes.get_checkpoint_from_file(
cloned_repo=self.checkpoint_config_repo,
path=self.checkpoint_path,
)
self.processed_indexes = set(checkpoint.get("processed_indexes", []))
self.log(
f"Loaded checkpoint with {len(self.processed_indexes)} "
f"already-processed indexes."
)

def mine_and_publish_alpine_packageurls(self):
alpine.mine_and_publish_alpine_packageurls(
data_cluster=self.data_cluster,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
commit_msg_func=self.commit_message,
logger=self.log,
processed_indexes=self.processed_indexes,
checkpoint_func=self._save_checkpoint,
)

def _save_checkpoint(self):
"""Save current set of processed indexes as a checkpoint."""
checkpoint = {"processed_indexes": sorted(self.processed_indexes)}
self.log(
f"Saving checkpoint with {len(self.processed_indexes)} processed indexes."
)
pipes.update_checkpoints_in_github(
checkpoint=checkpoint,
cloned_repo=self.checkpoint_config_repo,
path=self.checkpoint_path,
logger=self.log,
)

def save_checkpoint(self):
self._save_checkpoint()
33 changes: 30 additions & 3 deletions minecode_pipelines/pipelines/mine_maven.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,54 @@ def fetch_checkpoint_and_maven_index(self):
logger=self.log,
)

# Determine if we can resume from the last processed purl
saved_checksum = checkpoint.get("index_checksum")
current_checksum = self.maven_nexus_collector.index_checksum
self.last_processed_purl = None

if saved_checksum and saved_checksum == current_checksum:
self.last_processed_purl = checkpoint.get("last_processed_purl")
if self.last_processed_purl:
self.log(
f"Index checksum matches. Resuming from: {self.last_processed_purl}"
)
elif saved_checksum and saved_checksum != current_checksum:
self.log(
"Index checksum changed. Starting from beginning."
)

def mine_and_publish_maven_packageurls(self):
_mine_and_publish_packageurls(
packageurls=self.maven_nexus_collector.get_packages(),
packageurls=self.maven_nexus_collector.get_packages(
last_processed_purl=self.last_processed_purl,
),
total_package_count=None,
data_cluster=self.data_cluster,
checked_out_repos=self.checked_out_repos,
working_path=self.working_path,
append_purls=self.append_purls,
commit_msg_func=self.commit_message,
logger=self.log,
checkpoint_func=self._save_checkpoint,
)

def save_check_point(self):
def _save_checkpoint(self):
"""Save current progress as a checkpoint."""
last_incremental = self.maven_nexus_collector.index_properties.get(
"nexus.index.last-incremental"
)
checkpoint = {"last_incremental": last_incremental}
checkpoint = {
"last_incremental": last_incremental,
"index_checksum": self.maven_nexus_collector.index_checksum,
"last_processed_purl": self.maven_nexus_collector.last_processed_purl,
}
self.log(f"Saving checkpoint: {checkpoint}")
pipes.update_checkpoints_in_github(
checkpoint=checkpoint,
cloned_repo=self.checkpoint_config_repo,
path=self.checkpoint_path,
logger=self.log,
)

def save_check_point(self):
self._save_checkpoint()
26 changes: 24 additions & 2 deletions minecode_pipelines/pipes/alpine.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,29 @@ def mine_and_publish_alpine_packageurls(
working_path,
commit_msg_func,
logger,
processed_indexes=None,
checkpoint_func=None,
):
"""Yield PackageURLs from Alpine index."""

index_count = len(ALPINE_LINUX_APKINDEX_URLS)
if processed_indexes is None:
processed_indexes = set()

indexes_to_process = [
url for url in ALPINE_LINUX_APKINDEX_URLS
if url not in processed_indexes
]

index_count = len(indexes_to_process)
total_count = len(ALPINE_LINUX_APKINDEX_URLS)
skipped_count = total_count - index_count

if skipped_count:
logger(
f"Skipping {skipped_count:,d} already-processed indexes. "
f"Processing {index_count:,d} remaining indexes."
)

progress = LoopProgress(
total_iterations=index_count,
logger=logger,
Expand All @@ -563,7 +582,7 @@ def mine_and_publish_alpine_packageurls(

logger(f"Mine PackageURL from {index_count:,d} alpine index.")
alpine_collector = AlpineCollector()
for index in progress.iter(ALPINE_LINUX_APKINDEX_URLS):
for index in progress.iter(indexes_to_process):
logger(f"Mine PackageURL from {index} index.")
_mine_and_publish_packageurls(
packageurls=alpine_collector.get_package_from_index(index),
Expand All @@ -575,3 +594,6 @@ def mine_and_publish_alpine_packageurls(
commit_msg_func=commit_msg_func,
logger=logger,
)
processed_indexes.add(index)
if checkpoint_func:
checkpoint_func()
39 changes: 36 additions & 3 deletions minecode_pipelines/pipes/maven.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#

import gzip
import hashlib
import io
import os
from collections import namedtuple
Expand Down Expand Up @@ -574,6 +575,7 @@ def __init__(
index_location=None,
index_properties_location=None,
last_incremental=None,
last_processed_purl=None,
logger=None,
):
if index_location and last_incremental:
Expand All @@ -584,6 +586,7 @@ def __init__(
)

self.downloads = []
self.last_processed_purl = last_processed_purl

if index_properties_location:
self.index_properties_location = index_properties_location
Expand Down Expand Up @@ -615,6 +618,25 @@ def __init__(
self.index_location = index_download.path
self.index_increment_locations = []

self.index_checksum = self._compute_index_checksum()

def _compute_index_checksum(self):
"""Compute SHA-256 checksum of the index file(s) for checkpoint validation."""
locations = []
if self.index_location:
locations.append(self.index_location)
locations.extend(self.index_increment_locations)

if not locations:
return None

sha256 = hashlib.sha256()
for location in sorted(locations):
with open(location, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()

def __del__(self):
if self.downloads:
for download in self.downloads:
Expand Down Expand Up @@ -658,8 +680,9 @@ def _fetch_index_increments(self, last_incremental):
index_increment_downloads.append(index_increment)
return index_increment_downloads

def _get_packages(self, content=None):
def _get_packages(self, content=None, last_processed_purl=None):
artifacts = get_artifacts(content, worthyness=is_worthy_artifact)
skipping = bool(last_processed_purl)

for artifact in artifacts:
# we cannot do much without these
Expand Down Expand Up @@ -723,17 +746,27 @@ def _get_packages(self, content=None):
name=artifact_id,
version=version,
)

if skipping:
if str(current_purl) == last_processed_purl:
skipping = False
continue

self.last_processed_purl = str(current_purl)
yield current_purl, [package.purl]

def _get_packages_from_index_increments(self):
for index_increment in self.index_increment_locations:
yield self._get_packages(content=index_increment)

def get_packages(self):
def get_packages(self, last_processed_purl=None):
"""Yield Package objects from maven index or index increments"""
packages = []
if self.index_increment_locations:
packages = chain(self._get_packages_from_index_increments())
elif self.index_location:
packages = self._get_packages(content=self.index_location)
packages = self._get_packages(
content=self.index_location,
last_processed_purl=last_processed_purl,
)
return packages
8 changes: 4 additions & 4 deletions minecode_pipelines/tests/data/alpine/expected_packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"type": "apk",
"namespace": "alpine",
"name": "prspkt",
"name": "2bwm",
"version": "0.3-r2",
"qualifiers": {
"arch": "x86_64",
Expand Down Expand Up @@ -75,12 +75,12 @@
"repository_download_url": null,
"api_data_url": null,
"datasource_id": "alpine_metadata",
"purl": "pkg:apk/alpine/prspkt@0.3-r2?arch=x86_64&distro=v3.22"
"purl": "pkg:apk/alpine/2bwm@0.3-r2?arch=x86_64&distro=v3.22"
},
{
"type": "apk",
"namespace": "alpine",
"name": "prspkt",
"name": "2bwm-doc",
"version": "0.3-r2",
"qualifiers": {
"arch": "x86_64",
Expand Down Expand Up @@ -153,6 +153,6 @@
"repository_download_url": null,
"api_data_url": null,
"datasource_id": "alpine_metadata",
"purl": "pkg:apk/alpine/prspkt@0.3-r2?arch=x86_64&distro=v3.22"
"purl": "pkg:apk/alpine/2bwm-doc@0.3-r2?arch=x86_64&distro=v3.22"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#Mon Oct 29 01:28:33 UTC 2018
nexus.index.id=central
nexus.index.chain-id=1318453614498
nexus.index.timestamp=20181029012159.470 +0000
nexus.index.last-incremental=445
nexus.index.time=20120615133728.952 +0000
49 changes: 49 additions & 0 deletions minecode_pipelines/tests/pipes/test_alpine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#

import os
from unittest.mock import patch

from commoncode.testcase import check_against_expected_json_file
from commoncode.testcase import FileBasedTesting
Expand All @@ -27,3 +28,51 @@ def test_parse_apkindex_and_build_package(self):
packages.append(pd.to_dict())
expected_loc = self.get_test_loc("alpine/expected_packages.json")
check_against_expected_json_file(packages, expected_loc, regen=False)

def test_mine_and_publish_skips_processed_indexes(self):
"""Test that already-processed index URLs are skipped."""
all_urls = alpine.ALPINE_LINUX_APKINDEX_URLS
processed_indexes = set(all_urls[1:])
calls = []

def mock_mine_and_publish(**kwargs):
calls.append(kwargs.get("packageurls"))

with patch(
"minecode_pipelines.pipes.alpine._mine_and_publish_packageurls",
side_effect=mock_mine_and_publish,
):
alpine.mine_and_publish_alpine_packageurls(
data_cluster=None,
checked_out_repos={},
working_path=None,
commit_msg_func=lambda *a, **kw: "test",
logger=lambda msg: None,
processed_indexes=processed_indexes,
)
self.assertEqual(len(calls), 1)

def test_mine_and_publish_updates_processed_indexes(self):
"""Test that processed_indexes is updated after each index."""
processed_indexes = set()

with patch(
"minecode_pipelines.pipes.alpine._mine_and_publish_packageurls",
):
original_urls = alpine.ALPINE_LINUX_APKINDEX_URLS
alpine.ALPINE_LINUX_APKINDEX_URLS = original_urls[:2]
try:
alpine.mine_and_publish_alpine_packageurls(
data_cluster=None,
checked_out_repos={},
working_path=None,
commit_msg_func=lambda *a, **kw: "test",
logger=lambda msg: None,
processed_indexes=processed_indexes,
)
finally:
alpine.ALPINE_LINUX_APKINDEX_URLS = original_urls

self.assertEqual(len(processed_indexes), 2)
self.assertIn(original_urls[0], processed_indexes)
self.assertIn(original_urls[1], processed_indexes)
2 changes: 1 addition & 1 deletion minecode_pipelines/tests/pipes/test_conan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class ConanPipelineTests(TestCase):
def test_collect_packages_from_conan_calls_write(self, mock_write):
def test_collect_packages_from_conan_calls_write(self):
packages_file = DATA_DIR / "cairo" / "cairo-config.yml"
expected_file = DATA_DIR / "expected-cairo-purls.yml"

Expand Down
Loading