Source code for critcatworks.database.update
from fireworks import Firework, FWorker, LaunchPad, PyTask, ScriptTask, TemplateWriterTask, FileTransferTask, Workflow
from fireworks.core.rocket_launcher import launch_rocket, rapidfire
from fireworks.queue.queue_launcher import launch_rocket_to_queue
from fireworks.user_objects.queue_adapters.common_adapter import *
import os,time, pathlib, sys
from fireworks import explicit_serialize, FiretaskBase, FWAction
from fireworks.user_objects.firetasks.dataflow_tasks import ForeachTask
from pprint import pprint as pp
import ase, ase.io
import logging, datetime
from critcatworks.database.extdb import update_workflows_collection
import numpy as np
[docs]@explicit_serialize
class InitialTask(FiretaskBase):
"""
Custom Firetask to initialize a new workflow instance
in the database.
Additionally, initializes a few entries in the fw_spec.
"""
_fw_name = 'InitialTask'
required_params = ['username', 'password', 'parameters', 'name', 'workflow_type']
optional_params = ['extdb_connect']
[docs] def run_task(self, fw_spec):
username = self["username"]
password = self["password"]
parameters = self["parameters"]
extdb_connect = self["extdb_connect"]
name = self["name"]
workflow_type = self["workflow_type"]
creation_time = str(datetime.datetime.now(tz=None))
extdb_connect["username"] = username
extdb_connect["password"] = password
extdb_connect["host"] = extdb_connect.get("host",
"nanolayers.dyndns.org:27017")
extdb_connect["db_name"] = extdb_connect.get("db_name",
"testdb")
extdb_connect["authsource"] = extdb_connect.get("authsource",
extdb_connect["db_name"])
workflow = update_workflows_collection(username, password,
creation_time, parameters = parameters,
name = name, workflow_type = workflow_type, extdb_connect = extdb_connect)
update_spec = fw_spec
update_spec["temp"] = {}
update_spec["simulations"] = {}
update_spec["workflow"] = workflow
update_spec["machine_learning"] = {}
update_spec["extdb_connect"] = extdb_connect
update_spec["temp"]["calc_analysis_ids_dict"] = {}
update_spec["analysis_ids"] = []
update_spec.pop("_category")
update_spec.pop("name")
return FWAction(update_spec=update_spec)
[docs]def initialize_workflow_data(username, password, parameters, name = "UNNAMED",
workflow_type = "UNNAMED", extdb_connect = {}):
"""
Creates a custom Firework object to initialize the workflow.
It updates the workflow collection and makes a few entries in
the fw_spec.
Args:
username (str) : username for the mongodb database
password (str) : password for the mongodb database
parameters (dict) : workflow-specific input parameters
name (str) : custom name of the workflow
workflow_type (str) : custom workflow type
extdb_connect (dict): dictionary optionally containing the keys host,
authsource and db_name. All fields have a default
value.
Returns:
Firework object : InitialWork
"""
firetask1 = InitialTask(username = username, password = password,
parameters = parameters, name = name,
workflow_type = workflow_type, extdb_connect = extdb_connect)
fw = Firework([firetask1], spec = {'_category' : "lightweight", 'name' : 'InitialTask'},
name = 'InitialWork')
return fw