# Copyright 2023 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from pathlib import Path
from types import MethodType
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from covalent._shared_files import logger
from .base import BaseTrigger
app_log = logger.app_log
log_stack_info = logger.log_stack_info
class DirEventHandler(FileSystemEventHandler):
def __init__(self) -> None:
self.supported_event_to_func_names = {
"created": "on_created",
"deleted": "on_deleted",
"modified": "on_modified",
"moved": "on_moved",
"closed": "on_closed",
}
[docs]class DirTrigger(BaseTrigger):
"""
Directory or File based trigger which watches for events in said file/dir
and performs a trigger action whenever they happen.
Args:
dir_path: Path to the file/dir which is to be observed for events
event_names: List of event names on which to perform the trigger action.
Possible options can be a subset of: `["created", "deleted", "modified", "moved", "closed"]`.
batch_size: The number of changes to wait for before performing the trigger action, default is 1.
recursive: Whether to recursively watch the directory, default is False.
Attributes:
self.dir_path: Path to the file/dir which is to be observed for events
self.event_names: List of event names on which to perform the trigger action.
Possible options can be a subset of: `["created", "deleted", "modified", "moved", "closed"]`
self.batch_size: The number of events to wait for before performing the trigger action, default is `1`.
self.recursive: Whether to recursively watch the directory, default is False.
self.n_changes: Number of events since last trigger action. Whenever `self.n_changes == self.batch_size` a trigger action happens.
"""
def __init__(
self,
dir_path,
event_names,
batch_size: int = 1,
recursive: bool = False,
lattice_dispatch_id: str = None,
dispatcher_addr: str = None,
triggers_server_addr: str = None,
):
super().__init__(lattice_dispatch_id, dispatcher_addr, triggers_server_addr)
self.dir_path = dir_path
if isinstance(event_names, str):
event_names = [event_names]
self.event_names = event_names
self.batch_size = batch_size
self.recursive = recursive
self.observe_blocks = False
self.event_handler = None
[docs] def attach_methods_to_handler(self) -> None:
"""
Dynamically attaches and overrides the "on_*" methods to the handler
depending on which ones are requested by the user.
Args:
event_names: List of event names upon which to perform a trigger action
"""
app_log.warning("Attaching methods to dir handler")
self.n_changes = 0
def proxy_trigger(_, event_object):
self.n_changes += 1
if self.n_changes == self.batch_size:
self.trigger()
self.n_changes = 0
for en in self.event_names:
func_name = self.event_handler.supported_event_to_func_names[en]
proxy_trigger.__name__ = func_name
setattr(self.event_handler, func_name, MethodType(proxy_trigger, self.event_handler))
[docs] def observe(self) -> None:
"""
Start observing the file/dir for any possible events among the ones
mentioned in `self.event_names`.
Currently only supports running within the Covalent/Triggers server.
"""
app_log.warning(f"In DirTrigger's observe, dir path is: {self.dir_path}")
# Resolving the path at the place where observation will happen
self.dir_path = str(Path(self.dir_path).expanduser().resolve())
self.event_loop = asyncio.get_running_loop()
self.event_handler = DirEventHandler()
# Attach methods before scheduling the observer
self.attach_methods_to_handler()
self.observer = Observer()
print("Schedule is: ", type(self.observer.schedule))
self.observer.schedule(self.event_handler, self.dir_path, recursive=self.recursive)
self.observer.start()
[docs] def stop(self) -> None:
"""
Stop observing the file or directory for changes.
"""
self.observer.stop()
self.observer.join()