""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import time from contextlib import suppress from pathlib import Path from defence360agent.contracts.config import Core from defence360agent.contracts.permissions import ( ms_imunify_patch_eligible_to_purchase, ) from defence360agent.contracts.plugins import MessageSource from defence360agent.utils import recurring_check from imav.api.cleanup_revert import ( CleanupRevertAPI as PatchRevertAPI, ) from imav.malwarelib.config import VulnerabilityHitStatus from imav.malwarelib.model import VulnerabilityHit from imav.malwarelib.vulnerabilities.storage import restore_hits PERIOD = 3 * 60 * 60 # 3 hours class PatchRevertPlugin(MessageSource): def __init__(self): self._patch_revert_flag = Path(Core.TMPDIR) / "patch_revert" self.initiator = "imunify" self._task = None async def create_source(self, loop, sink): self._sink = sink self._task = loop.create_task( recurring_check(PERIOD)(self.process_patch_revert)() ) async def shutdown(self): if self._task: self._task.cancel() with suppress(asyncio.CancelledError): await self._task async def process_patch_revert(self): if not await ms_imunify_patch_eligible_to_purchase(): return if ( self._patch_revert_flag.exists() and (self._patch_revert_flag.stat().st_mtime + PERIOD) > time.time() ): # don't try to make requests too often return files, _ = await PatchRevertAPI.paths() if files: if hits := list( VulnerabilityHit.get_hits( files, statuses=[VulnerabilityHitStatus.PATCHED] ) ): await restore_hits(hits, self.initiator) self._patch_revert_flag.touch(mode=0o644, exist_ok=True)