If isinstance(dag._schedule_interval, six.string_types): Self.bag_dag(dag, parent_dag=dag, root_dag=dag) If not head and (ext = '.py' or ext = '.pyc'): Mod_name, ext = os.path.splitext(mod.filename) With timeout(( 'core', "DAGBAG_IMPORT_TIMEOUT")): Org_mod_name, _ = os.path.splitext(os.path.split(filepath)) Is_zipfile = zipfile.is_zipfile(filepath) ( "Creating / updating %s in ORM", ti)Īirflow/models/_init_.py#DagBag.process_fileĭef process_file(self, filepath, only_if_updated=True, safe_mode=True): # Also save this task instance to the DB. All tasks in the # scheduled state will be sent to the executor # Task starts out in the scheduled state. Ti.refresh_from_db(session=session, lock_for_update=True)ĭep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True) Self._process_dags(dagbag, dags, ti_keys_to_schedule) # Not using multiprocessing.Queue() since it's no longer a separate # process and due to some unusual behavior. Simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))ĭags = [dag for dag in () # Only return DAGs that are not paused if dag_id not in paused_dag_ids: # Pickle the DAGs (if necessary) and put them into a SimpleDag for dag_id in dagbag.dags: # Save individual DAGs in the ORM and update DagModel.last_scheduled_time for dag in ():
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |