# Akil Ram
# Questions
1) brain storming
1) We will not use airflow anymore, am I right?
2) all task will be run in kubernetes pod
3) Decide the flow (how flow will work), give an example (bigquery)
Bigquery
[bigquery, sheet]
sheet <-> bigquery (copy data)
service communication happen here
Points:
1) POD: job deployment type (service communication)
2) pod will be in sleep mode
Architechture:
1) decentralize
pubsub:
topics and subscribers
server_api - > single_topic -> [bigquery_subscriber, sheet_subs]
metadata will decide that where to process
### command
```
docker build -f DockerfileCommon -t startkiller .
```
### Current Scope:
Task:
1. Bigquery
1.1 bigquery_function
1.2 bigquery_run
1.3 bigquery_values
1.4 bigquery_query
1.5 bigquery_bucket
bigquery_query:
"""Execute a query and write results to table.
[update: 5 july 2022]
Questions:
1. how other service will talk to this job.
2. how pod will trigger.
3. how parameter we will pass.
Answers from the team.
1.Need to research on how to initiliaze a pod will it be command line or through commands
2.Currently it is command line its in compute engine
3.Need to research how can we initiliaze with GKE using commands.
4.There will be a micro service that would be running and that will hit the command or whatever task it has to run and how to pass the commandline arguments with it, that has to be researched.
```
python main.py --project test --dataset test --table test -s '{ "setup":{ "auth":{ "service":"[JSON OR PATH]" }}}'
```
[update: 7 july 2022]
How to pass the service account and config file
You will get reference to that secret manager entity of it's tuple and then we will decide a secret manager code for getting the value of service account and then passing it to the task itself.
The Micro service that we will have on the tenant side would need to do that work, so task would have already everything like the JSON file, with that info about the task itself it will aslo set the service account file and then it should come from it.
Need to research on how to initiliaze a pod will it be command line or through commands.
There would be a command to run the pods itself. We Can use both command line and commands. If its a service API ideally we will use GKE API.
Currently it is command line its in compute engine
Need to research how can we initiliaze with GKE using commands.
There will be a micro service that would be running and that will hit the command or whatever task it has to run and how to pass the commandline arguments with it, that has to be researched.
update 12/07/22
API -> create Batch job pod -> complete task -> release resources
How parameter will be passed?
Ans:
Deploy pod on different projects?
Can we use cloud Run?
1. we can deploy docker build there
2. pass arguments to that docker
3. it will spin up pod, process and shutdown when no use
How It will run?
steps:
1. get requerst to API
2. API will create pod and submit job to that pod with dynamic arguments
3. when job will complete after some seconds (can set accordingly) will release resources.
```
# save this a simple-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: batch-job-example-1
labels:
jobgroup: bigquery-task
spec:
ttlSecondsAfterFinished: 30
template:
metadata:
name: bigquery-task
labels:
jobgroup: jobexample
spec:
restartPolicy: OnFailure
containers:
- name: my-job
image: alpine
imagePullPolicy: IfNotPresent
command: ['sh', '-c', 'echo First Batch Job; sleep 10']
```
TO create pod
```
kubectl apply -f simple-job.yaml
```
update [14 july 2022]
entry.py
```python
import uuid
import argparse
from task_executer import Kubernetes
from kubernetes import client
if __name__ == "__main__":
parser = argparse.ArgumentParser("Task Executor")
parser.add_argument("input_string", help="Input String", type=str)
args = parser.parse_args()
job_id = uuid.uuid4()
pod_id = job_id
""" Steps 1 to 3 is the equivalent of the ./manifestfiles/shuffler_job.yaml """
# Kubernetes instance
k8s = Kubernetes()
# STEP1: CREATE A CONTAINER
_image = "shuffler:latest"
_name = "shuffler"
_pull_policy = "IfNotPresent"
shuffler_container = k8s.create_container(_image, _name, _pull_policy, args.input_string)
# STEP2: CREATE A POD TEMPLATE SPEC
_pod_name = f"bigquery-{pod_id}"
_pod_spec = k8s.create_pod_template(_pod_name, shuffler_container)
# STEP3: CREATE A JOB
_job_name = f"my-job-{job_id}"
_job = k8s.create_job(_job_name, _pod_spec)
# STEP4: CREATE NAMESPACE
_namespace = "testns"
k8s.create_namespace(_namespace)
# STEP5: EXECUTE THE JOB
batch_api = client.BatchV1Api()
batch_api.create_namespaced_job(_namespace, _job)
```
shuffler.py
```python=
import random
import logging
import argparse
logging.basicConfig(level=logging.INFO)
def name_shuffler(input_string):
if isinstance(input_string, str):
if len(input_string) < 2:
raise Exception("Must be at least 2 characters.")
logging.info(f"Original string: {input_string}")
str_var = list(input_string)
random.shuffle(str_var)
logging.info(f"Shuffled string: {''.join(str_var)}")
else:
raise Exception("Input must be a string.")
if __name__ == "__main__":
parser = argparse.ArgumentParser("Shuffler")
parser.add_argument("input_string", help="Input String", type=str)
args = parser.parse_args()
name_shuffler(args.input_string)
```
```dockerfile=
FROM continuumio/miniconda3:4.6.14
RUN set -ex; \
apt-get update \
&& apt-get -y install --no-install-recommends build-essential=* python-dev=* make=* \
&& rm -rf /var/lib/apt/lists/*
ENV PYTHONDONTWRITEBYTECODE=1
RUN pip install Kubernetes
COPY shuffler.py .
CMD ["python", "-u", "shuffler.py"]
```
kubernetes==24.2.0
task_executer.py
```python=
import logging
import argparse
import uuid
from kubernetes import client
from kubernetes import config
logging.basicConfig(level=logging.INFO)
config.load_kube_config()
class Kubernetes:
def __init__(self):
# Init Kubernetes
self.core_api = client.CoreV1Api()
self.batch_api = client.BatchV1Api()
def create_namespace(self, namespace):
namespaces = self.core_api.list_namespace()
all_namespaces = []
for ns in namespaces.items:
all_namespaces.append(ns.metadata.name)
if namespace in all_namespaces:
logging.info(f"Namespace {namespace} already exists. Reusing.")
else:
namespace_metadata = client.V1ObjectMeta(name=namespace)
self.core_api.create_namespace(
client.V1Namespace(metadata=namespace_metadata)
)
logging.info(f"Created namespace {namespace}.")
return namespace
@staticmethod
def create_container(image, name, pull_policy, args):
container = client.V1Container(
image=image,
name=name,
image_pull_policy=pull_policy,
args=[args],
command=["python", "-u", "shuffler.py"],
)
logging.info(
f"Created container with name: {container.name}, "
f"image: {container.image} and args: {container.args}"
)
return container
@staticmethod
def create_pod_template(pod_name, container):
pod_template = client.V1PodTemplateSpec(
spec=client.V1PodSpec(restart_policy="OnFailure", containers=[container]),
metadata=client.V1ObjectMeta(name=pod_name, labels={"pod_name": pod_name, "jobgroup": "bigquery-task"}),
)
return pod_template
@staticmethod
def create_job(job_name, pod_template):
metadata = client.V1ObjectMeta(name=job_name, labels={"job_name": job_name})
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(backoff_limit=0, ttl_seconds_after_finished=30, template=pod_template),
)
return job
```
```
python entry.py "Kubernetes"
```
update [15 july 2022]
```cmd=
minikube image load "shuffle:0.1"
```
---------------------------
#### CODE
https://minikube.sigs.k8s.io/docs/start/
```python=
resources=client.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "200Mi"},
limits={"cpu": "500m", "memory": "500Mi"}
)
```
gcloud auth activate-service-account --key-file=/home/akhil/Downloads/spidr-coreapp-dev-ec4223012dbd.json
update [26 july 2022]
---Have to run multiple tasks
gcloud container clusters get-credentials test --zone=northamerica-northeast2
[04 Aug 2022]------------------
```python=
# import textwrap
#
import sys
from bigquery.helper import initialize_project
from pydantic import BaseModel, Field
from project import project
class BQTaskRequest(BaseModel):
# bq parameter
dataset: str = Field(default=None, description='Cloud ID of Google Cloud Project.')
table: str = Field(default=None)
# excel parameter
excel_sheet: str = None
excel_workbook: str = None
data_schema: str = None
# project parameter
recipe: str = Field(default=None, alias='recipe', description='Path to recipe json file to load.')
project: str = None
key: str = Field(default=None, description='API Key of Google Cloud Project.')
user: str = Field(default=None, description='Path to USER credentials json file.')
service: str = Field(default=None, description='Path to SERVICE credentials json file.')
client: dict = Field(default=None, description='Path to CLIENT credentials json file.')
instance: int = Field(default=1, description='Instance number of the task to run ( for tasks with same name ) '
'starting at 1.')
verbose: str = Field(default=None, description='Print all the steps as they happen.')
force: str = Field(default=None, description='Not used but included for compatiblity with another script.')
trace_print: str = Field(default=None, description='Execution trace written to stdout.')
trace_file: str = Field(default=None, description='Execution trace written to file.')
# other
csv: str = None
task_name: str = None
class Config:
allow_population_by_field_name = True
if __name__ == '__main__':
args = sys.argv[1:]
print("-------", args)
kwarg = {}
if args:
for arg in args:
key, value = arg.split("=")
kwarg[key] = value
job_parameters = BQTaskRequest(**kwarg)
project.from_parameter(request=job_parameters)
initialize_project()
# print("------------------", job_parameters.dict())
# print(kwarg)
# kwarg = {'dataset': 'test', 'table': 'test' , "json": '/Users/ramnivasdhaker/work/bq-task/sheet.json'}
```
pydantic==1.9.1
project file
```python!
###########################################################################
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###########################################################################
"""The core singleton class of StarThinker that translates json to python.
Project loads JSON and parameters and combines them for execturion. It handles
three important concepts:
1. Load the JSON and make all task parameters available to python scripts.
2. Load authentication, all three parameters are optional if scripts do not
use them. The following parameters can be passed for authentication.
user.json - user credentials json ( generated from client ), is refreshed
by StarThinker as required. Can be provided as a local path
or a Cloud Bucket Storage path for distributed jobs.
service.json - service credentials json ( generated from cloud project ).
Passed as a local path or an embedded json object for
distributed jobs.
client.json - client credentials json ( generated from cloud project ).
Also require a user json path which will be written to after
client authnetication. Once authenticated this client is not
required.
Credentials can be specified in one of three ways for maximum flexibility:
A. Specify credentials on command line (highest priority if given)
--user / -u = user credentials path
--client / -c = client credentials path (requires user credentials path)
--service / -s = service credentials path
B. Define credentials paths in JSON (overruled by command line)
In each json file create the following entry (client, or user, or
service)
```
{
"setup":{
"id":"[cloud project id]",
"auth":{
"client":"[/home/.credentials/hello-world_client.json]",
"service":"[/home/.credentials/hello-world_service.json]",
"user":"[/home/.credentials/hello-world_user.json]"
}
},
}
```
C. Use default credentials ( lowest priority, last resort )
If neither the json not the command line provides a path, the
environmental variable GOOGLE_APPLICATION_CREDENTIALS will be
used for service accounts. It is created by google cloud
utilities.
"""
import os
import re
import json
import pytz
import argparse
import textwrap
from datetime import datetime
from importlib import import_module
from util.debug import starthinker_trace_start
EXIT_ERROR = 1
EXIT_SUCCESS = 0
RE_UUID = re.compile(r'(\s*)("setup"\s*:\s*{)')
def get_project(filepath=None, stringcontent=None):
"""Loads json for Project Class. Available as helper.
Able to load JSON with newlines ( strips all newlines before load ).
Args:
- filepath: (string) The local file path to the recipe json file to load.
Returns:
Json of recipe file.
"""
if filepath:
with open(filepath) as recipe_file:
stringcontent = recipe_file.read()
try:
# allow the json to contain newlines for formatting queries and such
return json.loads(stringcontent.replace('\n', ' '))
except ValueError as e:
pos = 0
for count, line in enumerate(stringcontent.splitlines(), 1):
# do not add newlines, e.pos was run on a version where newlines were removed
pos += len(line)
if pos >= e.pos:
e.lineno = count
e.pos = pos
e.args = (
'JSON ERROR: %s LINE: %s CHARACTER: %s ERROR: %s LINE: %s' %
(filepath, count, pos - e.pos, str(e.msg), line.strip()),
)
raise
def utc_to_timezone(timestamp, timezone):
if timestamp:
return timestamp.replace(tzinfo=pytz.utc).astimezone(
pytz.timezone(timezone))
else:
return None
def is_scheduled(recipe, task={}):
"""Wrapper for day_hour_scheduled that returns True if current time zone safe hour is in recipe schedule.
Used as a helper for any cron job running projects. Keeping this logic in
project
helps avoid time zone detection issues and scheduling discrepencies between
machines.
Args:
- recipe: (Recipe JSON) The JSON of a recipe.
- task: ( dictionary / JSON ) The specific task being considered for
execution.
Returns:
- True if task is scheduled to run current hour, else False.
"""
tz = pytz.timezone(
recipe.get('setup', {}).get('timezone', 'America/Los_Angeles')
)
now_tz = datetime.now(tz)
day_tz = now_tz.strftime('%a')
hour_tz = now_tz.hour
days = recipe.get('setup', {}).get('day', [])
hours = [
int(h) for h in task.get(
'hour', recipe.get('setup', {}).get('hour', [])
)
]
if days == [] or day_tz in days:
if hours == [] or hour_tz in hours:
return True
return False
class project:
"""A singleton that represents the loaded recipe within python scripts.
All access to json scripts within StarThinker must pass through the project
class. It handles parameters, time zones, permissions management, and
scheduling overhead. Task function name must match JSON task name.
Project is meant as the entry point into all StarThinker scripts as follows:
```
from util_test.project import project
@project.from_parameters
def task():
pass # execute work using project.* as data from json
if __name__ == "__main__":
task()
```
Project is meant to be used by a helper.
```
import argparse
from util_test.project import project
if __name__ == "__main__":
# custom parameters
parser = argparse.ArgumentParser()
parser.add_argument('custom', help='custom parameter to be added to
standard project set.')
# initialize project
project.from_commandline(parser=parser)
# access arguments
auth = 'service' if project.args.service else 'user'
print(project.args.custom)
```
Project can also be initialized directly for non-json tasks:
```
from util_test.project import project
if __name__ == "__main__":
var_recipe = '/somepath/recipe.json'
var_user = '/somepath/user.json'
var_service = '/somepath/service.json'
project.initialize(_recipe=var_recipe, _user=var_user,
_service=var_service, _verbose=True)
```
Attributes: Dealing with authentication...
- project: (string) The Google Cloud project id.
- user: (string) Path to the user credentials json file. It can also be a
Google Cloud Bucket path when passed to the class directly.
- service: (string) Path to the service credentials json file. It can
also be a json object when passed to the project class directly.
- client: (string) Path to the client credentials json file. It can only
be a local path. Dealing with execution data...
- instance: (integer) When executing all tasks, it is the one based index
offset of the task to run.
- date: (date) A specific date or 'TODAY', which is changed to today's
date, passed to python scripts for reference.
- hour: (integer) When executing all tasks, it is the hour if spefified
for a task to execute. Dealing with debugging...
- verbose: (boolean) Prints all debug information in StarThinker code.
See: if project.verbose: print '...'.
- force: (boolean) For recipes with specific task hours, forces all tasks
to run regardless of hour specified.
"""
args = None
recipe = None
task = None
instance = None
filepath = None
verbose = False
force = False
function = None
timezone = None
now = None
date = None
hour = None
@classmethod
def from_commandline(cls, _task=None, parser=None, arguments=None):
"""Used in StarThinker scripts as entry point for command line calls.
Loads json for execution.
Usage example:
```
import argparse
from util_test.project import project
if __name__ == "__main__":
# custom parameters
parser = argparse.ArgumentParser()
parser.add_argument('custom', help='custom parameter to be added to
standard project set.')
# initialize project
project.from_commandline(parser=parser)
# access arguments
auth = 'service' if project.args.service else 'user'
print(project.args.custom)
```
Args:
- parser: (ArgumentParser) optional custom argument parser ( json argument
becomes optional if not None )
- arguments: (String) optional list of parameters to use when invoking
project ( defaults to ALL if set to None )
Returns:
Nothing, this manipulates a singleton object. All calls to project.*
result in the same object.
"""
if parser is None:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Command line to execute all tasks in a recipe once. ( Common Entry Point )
This script dispatches all the tasks in a JSON recipe to handlers in sequence.
For each task, it calls a subprocess to execute the JSON instructions, waits
for the process to complete and dispatches the next task, until all tasks are
complete or a critical failure ( exception ) is raised.
If an exception is raised in any task, all following tasks are not executed by design.
Example: python run.py [path to recipe file] --force
Caution: This script does NOT check if the last job finished, potentially causing overruns.
Notes:
- To avoid running the entire script when debugging a single task, the command line
can easily replace "all" with the name of any "task" in the json. For example
python all/run.py project/sample/say_hello.json
- Can be easily replaced with the following to run only the "hello" task:
python task/hello/run.py project/sample/say_hello.json
- Or specified further to run only the second hello task:
python task/hello/run.py project/sample/say_hello.json -i 2
"""))
if arguments is None or '-j' in arguments:
parser.add_argument('json', help='Path to recipe json file to load.')
else:
if arguments is None or '-j' in arguments:
parser.add_argument(
'--json', '-j', help='Path to recipe json file to load.')
if arguments is None or '-p' in arguments:
parser.add_argument(
'--project',
'-p',
help='Cloud ID of Google Cloud Project.',
default=None)
if arguments is None or '-k' in arguments:
parser.add_argument(
'--key',
'-k',
help='API Key of Google Cloud Project.',
default=None)
if arguments is None or '-u' in arguments:
parser.add_argument(
'--user',
'-u',
help='Path to USER credentials json file.',
default=None)
if arguments is None or '-s' in arguments:
parser.add_argument(
'--service',
'-s',
help='Path to SERVICE credentials json file.',
default=None)
if arguments is None or '-c' in arguments:
parser.add_argument(
'--client',
'-c',
help='Path to CLIENT credentials json file.',
default=None)
if arguments is None or '-i' in arguments:
parser.add_argument(
'--instance',
'-i',
help='Instance number of the task to run ( for tasks with same name ) starting at 1.',
default=1,
type=int)
if arguments is None or '-v' in arguments:
parser.add_argument(
'--verbose',
'-v',
help='Print all the steps as they happen.',
action='store_true')
if arguments is None or '-f' in arguments:
parser.add_argument(
'--force',
'-force',
help='Not used but included for compatiblity with another script.',
action='store_true')
if arguments is None or '-tp' in arguments:
parser.add_argument(
'--trace_print',
'-tp',
help='Execution trace written to stdout.',
action='store_true')
if arguments is None or '-tf' in arguments:
parser.add_argument(
'--trace_file',
'-tf',
help='Execution trace written to file.',
action='store_true')
cls.args, unknown = parser.parse_known_args()
# initialize the project singleton with passed in parameters
cls.initialize(
get_project(cls.args.json) if hasattr(cls.args, 'json') else {},
_task,
getattr(cls.args, 'instance', None),
getattr(cls.args, 'project', None),
getattr(cls.args, 'user', None),
getattr(cls.args, 'service', None),
getattr(cls.args, 'client', None),
getattr(cls.args, 'json', None),
getattr(cls.args, 'key', None),
getattr(cls.args, 'verbose', None),
getattr(cls.args, 'force', None),
getattr(cls.args, 'trace_print', None),
getattr(cls.args, 'trace_file', None)
)
@classmethod
def from_parameter(cls, request):
cls.args = request
parameters = request.dict()
print(parameters.get('recipe'))
cls.initialize(
get_project(parameters.get('recipe')) if parameters.get('recipe') else {},
parameters.get('task_name', None),
parameters.get('instance', None),
parameters.get('project', None),
parameters.get('user', None),
parameters.get('service', None),
parameters.get('client', None),
parameters.get('recipe', None),
parameters.get('key', None),
parameters.get('verbose', None),
parameters.get('force', None),
parameters.get('trace_print', None),
parameters.get('trace_file', None)
)
@classmethod
def get_task_index(cls):
i = 0
for c, t in enumerate(cls.recipe.get('tasks', [])):
if next(iter(t.keys())) == cls.function:
i += 1
if i == cls.instance:
return c
return None
@classmethod
def get_task(cls):
#if cls.task is None:
i = cls.get_task_index()
cls.task = None if i is None else next(
iter(cls.recipe['tasks'][i].values()))
return cls.task
@classmethod
def set_task(cls, function, parameters):
if cls.task is None:
cls.recipe['tasks'].append({function: parameters})
cls.function = function
cls.task = parameters
cls.instance = 1
else:
i = cls.get_task_index()
cls.recipe['tasks'][i] = {function: parameters}
cls.function = function
cls.task = parameters
cls.instance = sum([
1 for c, t in enumerate(cls.recipe['tasks'])
if t == function and c <= i
])
@staticmethod
def from_parameters(func):
"""Initializes a project singleton for execution by a task.
Either loads parameters (recipe, instance) passed to task programatically,
or if no parameters passed attmepts to load them from the command line.
Uses decorator pattern, task name is inferred from function ebing decorated.
Args:
- recipe: (dict) JSON object representing the project ( setup plus at
least one task )
- instance: (integer) numeric offset of task to run if multiple calls to
thsi task exist
"""
def from_parameters_wrapper(recipe=None, instance=1):
if recipe:
project.initialize(
_recipe=recipe, _task=func.__name__, _instance=instance)
else:
project.from_commandline(func.__name__)
func()
return from_parameters_wrapper
@classmethod
def initialize(cls,
_recipe={},
_task=None,
_instance=1,
_project=None,
_user=None,
_service=None,
_client=None,
_filepath=None,
_key=None,
_verbose=False,
_force=False,
_trace_print=False,
_trace_file=False):
"""Used in StarThinker scripts as programmatic entry point.
Set up the project singleton for execution of a task, be sure to mimic
defaults in helper
this function loads credentials from various source ( command line argument,
json, default credentials )
it also sets up time zone aware date and various helper flags such as force
and verbose.
Usage example:
```
from util_test.project import project
if __name__ == "__main__":
user = 'user.json'
service = 'service.json'
recipe = {'setup':..., 'tasks':.... }
project.initialize(
_recipe=recipe,
_user=user,
_service=service,
_verbose=True
)
```
Args:
- _recipe: (dict) JSON object representing the project ( setup plus at
least one task )
- _task: (string) Task name form recipe json task list to execute.
- _instance: (integer) See module description.
- _project: (string) See module description.
- _user: (string) See module description.
- _service: (string) See module description.
- _client: (string) See module description.
- _key: (string) See module description.
- _verbose: (boolean) See module description.
- _force: (boolean) See module description.
- _trace_print: (boolean) True if writing execution trace to stdout.
- _trace_file: (boolean) True if writing execution trace to file ( see
config.py ).
Returns:
Nothing, this manipulates a singleton object. All calls to project.*
result in the same object.
"""
starthinker_trace_start(_trace_print, _trace_file)
cls.recipe = _recipe
cls.function = _task
cls.instance = _instance
cls.force = _force
# populates the task variable based on function and instance
cls.get_task()
cls.verbose = _verbose
cls.filepath = _filepath
# add setup to json if not provided and loads command line credentials if given
if 'setup' not in cls.recipe:
cls.recipe['setup'] = {}
if 'auth' not in cls.recipe['setup']:
cls.recipe['setup']['auth'] = {}
if _service:
cls.recipe['setup']['auth']['service'] = _service
if _client:
cls.recipe['setup']['auth']['client'] = _client
# if user explicity specified by command line
if _user:
cls.recipe['setup']['auth']['user'] = _user
# or if user not given, then try default credentials ( disabled security risk to assume on behalf of recipe )
#elif not cls.recipe['setup']['auth'].get('user'):
# cls.recipe['setup']['auth']['user'] = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', None)
# if project id given, use it
if _project:
cls.recipe['setup']['id'] = _project
if _key:
cls.recipe['setup']['key'] = _key
# TBD: if project id not given, use service credentials
#elif not cls.recipe['setup'].get('id') and cls.recipe['setup']['auth'].get('service'):
# TBD: if project id not given, use client credentials
#elif not cls.recipe['setup'].get('id') and cls.recipe['setup']['auth'].get('client'):
cls.id = cls.recipe['setup'].get('id')
cls.key = cls.recipe['setup'].get('key')
# find date based on timezone
cls.timezone = pytz.timezone(
cls.recipe['setup'].get(
'timezone',
'America/Los_Angeles'
)
)
cls.now = datetime.now(cls.timezone)
cls.date = cls.now.date()
cls.hour = cls.now.hour
if cls.verbose:
print('TASK:', _task or 'all')
print('DATE:', cls.now.date())
print('HOUR:', cls.now.hour)
@classmethod
def execute(cls, _force=False):
"""Run all the tasks in a project in one sequence.
```
from util_test.project import project
if __name__ == "__main__":
var_user = '[USER CREDENTIALS JSON STRING OR PATH]'
var_service = '[SERVICE CREDENTIALS JSON STRING OR PATH]'
var_recipe = {
"tasks":[
{ "dataset":{
"auth":"service",
"dataset":"Test_Dataset"
}}
]
}
project.initialize(
_recipe=var_recipe,
_user=var_user,
_service=var_service,
_verbose=True
)
project.execute()
```
For a full list of tasks see: scripts/*.json
"""
returncode = EXIT_SUCCESS
instances = {}
for task in cls.recipe['tasks']:
script, task = next(iter(task.items()))
# count instance per task
instances.setdefault(script, 0)
instances[script] += 1
print('RUNNING TASK:', '%s %d' % (script, instances[script]))
if _force or cls.force or is_scheduled(cls.recipe, task):
try:
python_callable = getattr(
import_module('starthinker.task.%s.run' % script),
script
)
python_callable(cls.recipe, instances[script])
except Exception as e:
print(str(e))
returncode = EXIT_ERROR
else:
print(
'Schedule Skipping: add --force to ignore schedule or run specific task handler'
)
return returncode
```
helper.py
```python
###########################################################################
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###########################################################################
import json
from project import project
from util.bigquery import get_schema
from util.bigquery import table_to_schema
from util.csv import csv_to_rows
from util.csv import excel_to_rows
from util.bigquery import rows_to_table
def initialize_project():
auth = 'service' if project.args.service else 'user'
schema = json.loads(project.args.data_schema) if project.args.data_schema else None
print(project.args)
if project.args.csv:
with open(project.args.csv, 'r') as csv_file:
rows = csv_to_rows(csv_file.read())
if not schema:
rows, schema = get_schema(rows)
print('DETECETED SCHEMA', json.dumps(schema))
print('Please run again with the above schema provided.')
exit()
rows_to_table(
auth,
project.id,
project.args.dataset,
project.args.table,
rows,
schema
)
elif project.args.excel_workbook and project.args.excel_sheet:
with open(project.args.excel_workbook, 'r') as excel_file_2:
excel_file = project.args.excel_workbook
rows = excel_to_rows(excel_file, project.args.excel_sheet)
if not schema:
rows, schema = get_schema(rows)
print('DETECETED SCHEMA', json.dumps(schema))
print('Please run again with the above schema provided.')
exit()
rows_to_table(
auth,
project.id,
project.args.dataset,
project.args.table,
rows,
schema
)
else:
# print schema
print(json.dumps(
table_to_schema(
auth,
project.id,
project.args.dataset,
project.args.table
),
indent=2
))
```
```python
###########################################################################
#
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###########################################################################
"""Handler for "bigquery" task in recipes.
One of the oldest tasks in StarThinker, due for a refactor to get_rows and
put_rows. Please test thouroughly when modifying this module.
Includes:
bigquery_function - generate custom tables or functions
bigquery_query - execute a query and write results to table ( future get_rows / put_rows )
table - write query results to a table
sheet - write query results to a sheet
view - write query results to a view
bigquery_run - execute a query without expected return results
bigquery_storage - read from storage into a table
bigquery_values - write explicit values to a table ( future get_rows )
"""
from util.project import project
from util.csv import rows_to_type
from util.sheets import sheets_clear
from util.sheets import sheets_write
from util.data import get_rows, put_rows
from util.bigquery import query_to_table, query_to_view, storage_to_table, query_to_rows, json_to_table, rows_to_table, \
run_query, query_parameters
from util.bigquery.functions import pearson_significance_test
from util.bigquery.us_geography import US_GEOGRAPHY_DATA, US_GEOGRAPHY_SCHEMA
def bigquery_function():
"""Generate custom tables or functions.
See: scripts/bigquery_function.json
"""
if project.verbose:
print('FUNCTION', project.task['function'])
if project.task['function'] == 'Pearson Significance Test':
run_query(
project.task['auth'],
project.id,
pearson_significance_test(),
False,
project.task['to']['dataset']
)
elif project.task['function'] == 'US Geography':
json_to_table(
project.task['auth'],
project.id,
project.task['to']['dataset'],
'US_Geography',
US_GEOGRAPHY_DATA,
US_GEOGRAPHY_SCHEMA
)
def bigquery_run():
"""Execute a query without expected return results.
See: scripts/bigquery_run_query.json
"""
if project.verbose:
print('RUN QUERY', project.task['run']['query'])
run_query(
project.task['auth'],
project.id,
query_parameters(
project.task['run']['query'],
project.task['run'].get('parameters')
),
project.task['run'].get('legacy', True)
)
def bigquery_values():
"""Write explicit values to a table.
TODO: Replace with get_rows.
See: scripts/bigquery_run_query.json
"""
if project.verbose:
print('VALUES', project.task['from']['values'])
rows = get_rows(project.task['auth'], project.task['from'])
rows_to_table(
project.task['to'].get('auth', project.task['auth']),
project.id,
project.task['to']['dataset'],
project.task['to']['table'],
rows,
project.task.get('schema', []),
0
)
def bigquery_query():
"""Execute a query and write results to table.
TODO: Replace with get_rows and put_rows combination.
See: scripts/bigquery_query.json
scripts/bigquery_storage.json
scripts/bigquery_to_sheet.json
scripts/bigquery_view.json
"""
if 'table' in project.task['to']:
if project.verbose:
print('QUERY TO TABLE', project.task['to']['table'])
query_to_table(
project.task['auth'],
project.id,
project.task['to']['dataset'],
project.task['to']['table'],
query_parameters(
project.task['from']['query'],
project.task['from'].get('parameters')
),
disposition=project.task['write_disposition']
if 'write_disposition' in project.task
else 'WRITE_TRUNCATE',
legacy=project.task['from'].get(
'legacy',
project.task['from'].get('useLegacySql', True)
), # DEPRECATED: useLegacySql,
target_project_id=project.task['to'].get('project_id', project.id)
)
elif 'sheet' in project.task['to']:
if project.verbose:
print('QUERY TO SHEET', project.task['to']['sheet'])
rows = query_to_rows(
project.task['auth'],
project.id,
project.task['from']['dataset'],
query_parameters(
project.task['from']['query'],
project.task['from'].get('parameters')
),
legacy=project.task['from'].get('legacy', True)
)
# makes sure types are correct in sheet
rows = rows_to_type(rows)
sheets_clear(
project.task['to'].get('auth', project.task['auth']),
project.task['to']['sheet'],
project.task['to']['tab'],
project.task['to'].get('range', 'A2')
)
sheets_write(
project.task['to'].get('auth', project.task['auth']),
project.task['to']['sheet'],
project.task['to']['tab'],
project.task['to'].get('range', 'A2'),
rows
)
elif 'sftp' in project.task['to']:
if project.verbose:
print('QUERY TO SFTP')
rows = query_to_rows(
project.task['auth'],
project.id,
project.task['from']['dataset'],
query_parameters(
project.task['from']['query'],
project.task['from'].get('parameters')
),
legacy=project.task['from'].get('use_legacy_sql', True)
)
if rows:
put_rows(project.task['auth'], project.task['to'], rows)
else:
if project.verbose:
print('QUERY TO VIEW', project.task['to']['view'])
query_to_view(
project.task['auth'],
project.id,
project.task['to']['dataset'],
project.task['to']['view'],
query_parameters(
project.task['from']['query'],
project.task['from'].get('parameters')
),
project.task['from'].get(
'legacy',
project.task['from'].get('useLegacySql', True)
), # DEPRECATED: useLegacySql
project.task['to'].get('replace', False)
)
def bigquery_storage():
"""Read from storage into a table.
See: scripts/bigquery_storage.json
"""
if project.verbose:
print('STORAGE TO TABLE', project.task['to']['table'])
storage_to_table(
project.task['auth'], project.id, project.task['to']['dataset'],
project.task['to']['table'],
project.task['from']['bucket'] + ':' + project.task['from']['path'],
project.task.get('schema', []), project.task.get('skip_rows', 1),
project.task.get('structure', 'CSV'),
project.task.get('disposition', 'WRITE_TRUNCATE')
)
def bigquery_bucket():
"""Read from storage into a table.
See: scripts/bigquery_bucket.json
"""
if project.verbose:
print('STORAGE TO TABLE', project.task['to']['table'])
storage_to_table(
project.task['auth'], project.id, project.task['to']['dataset'],
project.task['to']['table'],
project.task['from']['bucket'] + ':' + project.task['from']['path'],
project.task.get('schema', []), project.task.get('skip_rows', 1),
project.task.get('structure', 'CSV'),
project.task.get('disposition', 'WRITE_TRUNCATE')
)
@project.from_parameters
def bigquery():
if 'function' in project.task:
bigquery_function()
elif 'run' in project.task and 'query' in project.task.get('run', {}):
bigquery_run()
elif 'values' in project.task['from']:
bigquery_values()
elif 'query' in project.task['from']:
bigquery_query()
elif 'bucket' in project.task['from']:
bigquery_bucket()
else:
raise NotImplementedError('The bigquery task has no such handler.')
if __name__ == '__main__':
bigquery()
```
``
```python main.py
if __name__ == '__main__':
args = sys.argv[1:]
print("-------", args)
kwarg = {}
if args:
for arg in args:
key, value = arg.split("=")
kwarg[key] = value
job_parameters = BQTaskRequest(**kwarg)
project.from_parameter(request=job_parameters)
# initialize_project()
bigquery()
```
```python=
class Singleton(type):
def __init__(self, name, bases, dic):
self.__single_instance = None
super().__init__(name, bases, dic)
def __call__(cls, *args, **kwargs):
if cls.__single_instance:
return cls.__single_instance
single_obj = cls.__new__(cls)
single_obj.__init__(*args, **kwargs)
cls.__single_instance = single_obj
return single_obj
```
``` json=
{
"setup":{
"license":"Apache License, Version 2.0",
"copyright":"Copyright 2020 Google LLC"
},
"tasks":[
{ "dataset":{
"auth":"service",
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"clear":true
}},
{ "bigquery": {
"auth":"service",
"from":{
"values":[
["2018-02-27", "dog", 7, 67],
["2018-03-01", "cat", 5, 1.5],
["2018-03-02", "bird", 12, 0.44],
["2018-03-03", "lizard", 22, 1],
["2018-03-04", "dinosaur", 1600, 273.97]
]
},
"to":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"table":"BQ_Values"
},
"schema":[
{ "name":"Date_Column", "type":"DATE" },
{ "name":"String_Column", "type":"STRING" },
{ "name":"Integer_Column", "type":"INTEGER" },
{ "name":"Float_Column", "type":"FLOAT" }
]
}},
{ "test": {
"auth":"service",
"bigquery":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"table":"BQ_Values",
"schema":[
{ "name":"Date_Column", "type":"DATE" },
{ "name":"String_Column", "type":"STRING" },
{ "name":"Integer_Column", "type":"INTEGER" },
{ "name":"Float_Column", "type":"FLOAT" }
],
"values":[
["2018-02-27", "dog", 7, 67.0],
["2018-03-01", "cat", 5, 1.5],
["2018-03-02", "bird", 12, 0.44],
["2018-03-03", "lizard", 22, 1.0],
["2018-03-04", "dinosaur", 1600, 273.97]
]
}
}},
{ "sheets":{
"auth":"user",
"sheet":"StarThinker Template: Sheets Test",
"tab":"Sheet_Template",
"range":"A1:C",
"header":true,
"out":{
"auth": "service",
"bigquery":{
"auth": "service",
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"table":"Sheet_To_BigQuery",
"schema":[
{ "name":"Animal", "type":"STRING" },
{ "name":"Age", "type":"INTEGER" },
{ "name":"Weight_lbs", "type":"FLOAT" }
]
}
}
}},
{ "test": {
"auth":"service",
"bigquery":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"table":"Sheet_To_BigQuery",
"schema":[
{"name": "Animal", "type": "STRING"},
{"name": "Age", "type": "INTEGER"},
{"name": "Weight_lbs", "type": "FLOAT"}
],
"values":[
["dog", 7, 67.0],
["cat", 5, 1.5],
["bird", 12, 0.44],
["lizard", 22, 1.0],
["dinosaur", 1600, 273.97]
]
}
}},
{ "test": {
"auth":"service",
"bigquery":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"query":"SELECT * FROM {test_dataset}.Sheet_To_BigQuery;",
"parameters": {
"test_dataset": {"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}}
},
"values":[
["dog", 7, 67.0],
["cat", 5, 1.5],
["bird", 12, 0.44],
["lizard", 22, 1.0],
["dinosaur", 1600, 273.97]
]
}
}},
{ "test": {
"auth":"service",
"bigquery":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"query":"SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal='dog';",
"parameters": {
"test_dataset": {"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}}
},
"values":[
["dog", 7, 67.0]
]
}
}},
{ "sheets":{
"auth":"user",
"sheet":{"field":{ "name":"test_run_id", "kind":"string", "description":"The sheet to use for the test.", "prefix":"StarThinker Test BigQuery ", "default": "Manual"}},
"tab":"BigQuery_Test",
"delete":true,
"template":{}
}},
{ "bigquery": {
"auth":"service",
"from":{
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"query":"SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal IN ('dog', 'cat') ORDER BY Animal;",
"parameters": {
"test_dataset": {"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}}
}
},
"to":{
"auth": "user",
"sheet":{"field":{ "name":"test_run_id", "kind":"string", "description":"The sheet to use for the test.", "prefix":"StarThinker Test BigQuery ", "default": "Manual"}},
"tab":"BigQuery_Test",
"range":"A1:C"
}
}},
{ "test": {
"auth":"user",
"sheets": {
"sheet":{"field":{ "name":"test_run_id", "kind":"string", "description":"The sheet to use for the test.", "prefix":"StarThinker Test BigQuery ", "default": "Manual"}},
"tab":"BigQuery_Test",
"range":"A1:C",
"values":[
["cat", 5, 1.5],
["dog", 7, 67]
]
}
}},
{ "drive":{
"auth":"user",
"delete":{"field":{ "name":"test_run_id", "kind":"string", "description":"The filename to use for the test.", "prefix":"StarThinker Test BigQuery ", "default": "Manual"}}
}},
{ "dataset":{
"auth":"service",
"dataset":{"field":{ "name":"test_run_id", "kind":"string", "description":"The dataset to use for the test.", "prefix":"StarThinker_Test_BigQuery_", "default": "Manual"}},
"delete":true
}}
]
}
```
#### API
```python=
import json
recipe_encoded = json.dumps(recipe).encode()
```
Bq Task
```python=
import json
json.loads(recipe.decode())
```
recipe={"setup": {"license": "Apache License, Version 2.0", "copyright": "Copyright 2020 Google LLC"}, "tasks": [{"dataset": {"auth": "service", "dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "clear": true}}]}
{'setup': {'license': 'Apache License, Version 2.0', 'copyright': 'Copyright 2020 Google LLC'}, 'tasks': [{'dataset': {'auth': 'service', 'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'clear': True}}, {'bigquery': {'auth': 'service', 'from': {'values': [['2018-02-27', 'dog', 7, 67], ['2018-03-01', 'cat', 5, 1.5], ['2018-03-02', 'bird', 12, 0.44], ['2018-03-03', 'lizard', 22, 1], ['2018-03-04', 'dinosaur', 1600, 273.97]]}, 'to': {'dataset': 'test', 'table': 'BQ_Values'}, 'schema': [{'name': 'Date_Column', 'type': 'DATE'}, {'name': 'String_Column', 'type': 'STRING'}, {'name': 'Integer_Column', 'type': 'INTEGER'}, {'name': 'Float_Column', 'type': 'FLOAT'}]}}, {'test': {'auth': 'service', 'bigquery': {'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'table': 'BQ_Values', 'schema': [{'name': 'Date_Column', 'type': 'DATE'}, {'name': 'String_Column', 'type': 'STRING'}, {'name': 'Integer_Column', 'type': 'INTEGER'}, {'name': 'Float_Column', 'type': 'FLOAT'}], 'values': [['2018-02-27', 'dog', 7, 67.0], ['2018-03-01', 'cat', 5, 1.5], ['2018-03-02', 'bird', 12, 0.44], ['2018-03-03', 'lizard', 22, 1.0], ['2018-03-04', 'dinosaur', 1600, 273.97]]}}}, {'sheets': {'auth': 'user', 'sheet': 'StarThinker Template: Sheets Test', 'tab': 'Sheet_Template', 'range': 'A1:C', 'header': True, 'out': {'auth': 'service', 'bigquery': {'auth': 'service', 'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'table': 'Sheet_To_BigQuery', 'schema': [{'name': 'Animal', 'type': 'STRING'}, {'name': 'Age', 'type': 'INTEGER'}, {'name': 'Weight_lbs', 'type': 'FLOAT'}]}}}}, {'test': {'auth': 'service', 'bigquery': {'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'table': 'Sheet_To_BigQuery', 'schema': [{'name': 'Animal', 'type': 'STRING'}, {'name': 'Age', 'type': 'INTEGER'}, {'name': 'Weight_lbs', 'type': 'FLOAT'}], 'values': [['dog', 7, 67.0], ['cat', 5, 1.5], ['bird', 12, 0.44], ['lizard', 22, 1.0], ['dinosaur', 1600, 273.97]]}}}, {'test': {'auth': 'service', 'bigquery': {'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'query': 'SELECT * FROM {test_dataset}.Sheet_To_BigQuery;', 'parameters': {'test_dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}}, 'values': [['dog', 7, 67.0], ['cat', 5, 1.5], ['bird', 12, 0.44], ['lizard', 22, 1.0], ['dinosaur', 1600, 273.97]]}}}, {'test': {'auth': 'service', 'bigquery': {'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'query': "SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal in ('dog',);", 'parameters': {'test_dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}}, 'values': [['dog', 7, 67.0]]}}}, {'sheets': {'auth': 'user', 'sheet': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The sheet to use for the test.', 'prefix': 'StarThinker Test BigQuery ', 'default': 'Manual'}}, 'tab': 'BigQuery_Test', 'delete': True, 'template': {}}}, {'bigquery': {'auth': 'service', 'from': {'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'query': "SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal IN ('dog', 'cat') ORDER BY Animal;", 'parameters': {'test_dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}}}, 'to': {'auth': 'user', 'sheet': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The sheet to use for the test.', 'prefix': 'StarThinker Test BigQuery ', 'default': 'Manual'}}, 'tab': 'BigQuery_Test', 'range': 'A1:C'}}}, {'test': {'auth': 'user', 'sheets': {'sheet': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The sheet to use for the test.', 'prefix': 'StarThinker Test BigQuery ', 'default': 'Manual'}}, 'tab': 'BigQuery_Test', 'range': 'A1:C', 'values': [['cat', 5, 1.5], ['dog', 7, 67]]}}}, {'drive': {'auth': 'user', 'delete': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The filename to use for the test.', 'prefix': 'StarThinker Test BigQuery ', 'default': 'Manual'}}}}, {'dataset': {'auth': 'service', 'dataset': {'field': {'name': 'test_run_id', 'kind': 'string', 'description': 'The dataset to use for the test.', 'prefix': 'StarThinker_Test_BigQuery_', 'default': 'Manual'}}, 'delete': True}}]}
#### ------------------------ 20 aug 2022 ------------------
```string=
{"setup": {"license": "Apache License, Version 2.0", "copyright": "Copyright 2020 Google LLC"}, "tasks": [{"dataset": {"auth": "service", "dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "clear": true}}, {"bigquery": {"auth": "service", "from": {"values": [["2018-02-27", "dog", 7, 67], ["2018-03-01", "cat", 5, 1.5], ["2018-03-02", "bird", 12, 0.44], ["2018-03-03", "lizard", 22, 1], ["2018-03-04", "dinosaur", 1600, 273.97]]}, "to": {"dataset": "test", "table": "BQ_Values"}, "schema": [{"name": "Date_Column", "type": "DATE"}, {"name": "String_Column", "type": "STRING"}, {"name": "Integer_Column", "type": "INTEGER"}, {"name": "Float_Column", "type": "FLOAT"}]}}, {"test": {"auth": "service", "bigquery": {"dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "table": "BQ_Values", "schema": [{"name": "Date_Column", "type": "DATE"}, {"name": "String_Column", "type": "STRING"}, {"name": "Integer_Column", "type": "INTEGER"}, {"name": "Float_Column", "type": "FLOAT"}], "values": [["2018-02-27", "dog", 7, 67], ["2018-03-01", "cat", 5, 1.5], ["2018-03-02", "bird", 12, 0.44], ["2018-03-03", "lizard", 22, 1], ["2018-03-04", "dinosaur", 1600, 273.97]]}}}, {"sheets": {"auth": "user", "sheet": "StarThinker Template: Sheets Test", "tab": "Sheet_Template", "range": "A1:C", "header": true, "out": {"auth": "service", "bigquery": {"auth": "service", "dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "table": "Sheet_To_BigQuery", "schema": [{"name": "Animal", "type": "STRING"}, {"name": "Age", "type": "INTEGER"}, {"name": "Weight_lbs", "type": "FLOAT"}]}}}}, {"test": {"auth": "service", "bigquery": {"dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "table": "Sheet_To_BigQuery", "schema": [{"name": "Animal", "type": "STRING"}, {"name": "Age", "type": "INTEGER"}, {"name": "Weight_lbs", "type": "FLOAT"}], "values": [["dog", 7, 67], ["cat", 5, 1.5], ["bird", 12, 0.44], ["lizard", 22, 1], ["dinosaur", 1600, 273.97]]}}}, {"test": {"auth": "service", "bigquery": {"dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "query": "SELECT * FROM {test_dataset}.Sheet_To_BigQuery;", "parameters": {"test_dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}}, "values": [["dog", 7, 67], ["cat", 5, 1.5], ["bird", 12, 0.44], ["lizard", 22, 1], ["dinosaur", 1600, 273.97]]}}}, {"test": {"auth": "service", "bigquery": {"dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "query": "SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal='dog';", "parameters": {"test_dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}}, "values": [["dog", 7, 67]]}}}, {"sheets": {"auth": "user", "sheet": {"field": {"name": "test_run_id", "kind": "string", "description": "The sheet to use for the test.", "prefix": "StarThinker Test BigQuery ", "default": "Manual"}}, "tab": "BigQuery_Test", "delete": true, "template": {}}}, {"bigquery": {"auth": "service", "from": {"dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "query": "SELECT * FROM {test_dataset}.Sheet_To_BigQuery WHERE Animal IN ('dog', 'cat') ORDER BY Animal;", "parameters": {"test_dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}}}, "to": {"auth": "user", "sheet": {"field": {"name": "test_run_id", "kind": "string", "description": "The sheet to use for the test.", "prefix": "StarThinker Test BigQuery ", "default": "Manual"}}, "tab": "BigQuery_Test", "range": "A1:C"}}}, {"test": {"auth": "user", "sheets": {"sheet": {"field": {"name": "test_run_id", "kind": "string", "description": "The sheet to use for the test.", "prefix": "StarThinker Test BigQuery ", "default": "Manual"}}, "tab": "BigQuery_Test", "range": "A1:C", "values": [["cat", 5, 1.5], ["dog", 7, 67]]}}}, {"drive": {"auth": "user", "delete": {"field": {"name": "test_run_id", "kind": "string", "description": "The filename to use for the test.", "prefix": "StarThinker Test BigQuery ", "default": "Manual"}}}}, {"dataset": {"auth": "service", "dataset": {"field": {"name": "test_run_id", "kind": "string", "description": "The dataset to use for the test.", "prefix": "StarThinker_Test_BigQuery_", "default": "Manual"}}, "delete": true}}]}
```
```python
import os
import re
import json
import pytz
import argparse
import textwrap
from datetime import datetime
from importlib import import_module
from util.debug import starthinker_trace_start
EXIT_ERROR = 1
EXIT_SUCCESS = 0
RE_UUID = re.compile(r'(\s*)("setup"\s*:\s*{)')
def get_project(filepath=None, stringcontent=None):
"""Loads json for Project Class. Available as helper.
Able to load JSON with newlines ( strips all newlines before load ).
Args:
- filepath: (string) The local file path to the recipe json file to load.
Returns:
Json of recipe file.
"""
if filepath:
with open(filepath) as recipe_file:
stringcontent = recipe_file.read()
try:
# allow the json to contain newlines for formatting queries and such
return json.loads(stringcontent.replace('\n', ' '))
except ValueError as e:
pos = 0
for count, line in enumerate(stringcontent.splitlines(), 1):
# do not add newlines, e.pos was run on a version where newlines were removed
pos += len(line)
if pos >= e.pos:
e.lineno = count
e.pos = pos
e.args = (
'JSON ERROR: %s LINE: %s CHARACTER: %s ERROR: %s LINE: %s' %
(filepath, count, pos - e.pos, str(e.msg), line.strip()),
)
raise
def utc_to_timezone(timestamp, timezone):
if timestamp:
return timestamp.replace(tzinfo=pytz.utc).astimezone(
pytz.timezone(timezone))
else:
return None
def is_scheduled(recipe, task={}):
"""Wrapper for day_hour_scheduled that returns True if current time zone safe hour is in recipe schedule.
Used as a helper for any cron job running projects. Keeping this logic in
project
helps avoid time zone detection issues and scheduling discrepencies between
machines.
Args:
- recipe: (Recipe JSON) The JSON of a recipe.
- task: ( dictionary / JSON ) The specific task being considered for
execution.
Returns:
- True if task is scheduled to run current hour, else False.
"""
tz = pytz.timezone(
recipe.get('setup', {}).get('timezone', 'America/Los_Angeles')
)
now_tz = datetime.now(tz)
day_tz = now_tz.strftime('%a')
hour_tz = now_tz.hour
days = recipe.get('setup', {}).get('day', [])
hours = [
int(h) for h in task.get(
'hour', recipe.get('setup', {}).get('hour', [])
)
]
if days == [] or day_tz in days:
if hours == [] or hour_tz in hours:
return True
return False
class project:
"""A singleton that represents the loaded recipe within python scripts.
All access to json scripts within StarThinker must pass through the project
class. It handles parameters, time zones, permissions management, and
scheduling overhead. Task function name must match JSON task name.
Project is meant as the entry point into all StarThinker scripts as follows:
```
from util_test.project import project
@project.from_parameters
def task():
pass # execute work using project.* as data from json
if __name__ == "__main__":
task()
```
Project is meant to be used by a helper.
```
import argparse
from util_test.project import project
if __name__ == "__main__":
# custom parameters
parser = argparse.ArgumentParser()
parser.add_argument('custom', help='custom parameter to be added to
standard project set.')
# initialize project
project.from_commandline(parser=parser)
# access arguments
auth = 'service' if project.args.service else 'user'
print(project.args.custom)
```
Project can also be initialized directly for non-json tasks:
```
from util_test.project import project
if __name__ == "__main__":
var_recipe = '/somepath/recipe.json'
var_user = '/somepath/user.json'
var_service = '/somepath/service.json'
project.initialize(_recipe=var_recipe, _user=var_user,
_service=var_service, _verbose=True)
```
Attributes: Dealing with authentication...
- project: (string) The Google Cloud project id.
- user: (string) Path to the user credentials json file. It can also be a
Google Cloud Bucket path when passed to the class directly.
- service: (string) Path to the service credentials json file. It can
also be a json object when passed to the project class directly.
- client: (string) Path to the client credentials json file. It can only
be a local path. Dealing with execution data...
- instance: (integer) When executing all tasks, it is the one based index
offset of the task to run.
- date: (date) A specific date or 'TODAY', which is changed to today's
date, passed to python scripts for reference.
- hour: (integer) When executing all tasks, it is the hour if spefified
for a task to execute. Dealing with debugging...
- verbose: (boolean) Prints all debug information in StarThinker code.
See: if project.verbose: print '...'.
- force: (boolean) For recipes with specific task hours, forces all tasks
to run regardless of hour specified.
"""
args = None
recipe = None
task = None
instance = None
filepath = None
verbose = False
force = False
function = None
timezone = None
now = None
date = None
hour = None
@classmethod
def from_commandline(cls, _task=None, parser=None, arguments=None):
"""Used in StarThinker scripts as entry point for command line calls.
Loads json for execution.
Usage example:
```
import argparse
from util_test.project import project
if __name__ == "__main__":
# custom parameters
parser = argparse.ArgumentParser()
parser.add_argument('custom', help='custom parameter to be added to
standard project set.')
# initialize project
project.from_commandline(parser=parser)
# access arguments
auth = 'service' if project.args.service else 'user'
print(project.args.custom)
```
Args:
- parser: (ArgumentParser) optional custom argument parser ( json argument
becomes optional if not None )
- arguments: (String) optional list of parameters to use when invoking
project ( defaults to ALL if set to None )
Returns:
Nothing, this manipulates a singleton object. All calls to project.*
result in the same object.
"""
if parser is None:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Command line to execute all tasks in a recipe once. ( Common Entry Point )
This script dispatches all the tasks in a JSON recipe to handlers in sequence.
For each task, it calls a subprocess to execute the JSON instructions, waits
for the process to complete and dispatches the next task, until all tasks are
complete or a critical failure ( exception ) is raised.
If an exception is raised in any task, all following tasks are not executed by design.
Example: python run.py [path to recipe file] --force
Caution: This script does NOT check if the last job finished, potentially causing overruns.
Notes:
- To avoid running the entire script when debugging a single task, the command line
can easily replace "all" with the name of any "task" in the json. For example
python all/run.py project/sample/say_hello.json
- Can be easily replaced with the following to run only the "hello" task:
python task/hello/run.py project/sample/say_hello.json
- Or specified further to run only the second hello task:
python task/hello/run.py project/sample/say_hello.json -i 2
"""))
if arguments is None or '-j' in arguments:
parser.add_argument('json', help='Path to recipe json file to load.')
else:
if arguments is None or '-j' in arguments:
parser.add_argument(
'--json', '-j', help='Path to recipe json file to load.')
if arguments is None or '-p' in arguments:
parser.add_argument(
'--project',
'-p',
help='Cloud ID of Google Cloud Project.',
default=None)
if arguments is None or '-k' in arguments:
parser.add_argument(
'--key',
'-k',
help='API Key of Google Cloud Project.',
default=None)
if arguments is None or '-u' in arguments:
parser.add_argument(
'--user',
'-u',
help='Path to USER credentials json file.',
default=None)
if arguments is None or '-s' in arguments:
parser.add_argument(
'--service',
'-s',
help='Path to SERVICE credentials json file.',
default=None)
if arguments is None or '-c' in arguments:
parser.add_argument(
'--client',
'-c',
help='Path to CLIENT credentials json file.',
default=None)
if arguments is None or '-i' in arguments:
parser.add_argument(
'--instance',
'-i',
help='Instance number of the task to run ( for tasks with same name ) starting at 1.',
default=1,
type=int)
if arguments is None or '-v' in arguments:
parser.add_argument(
'--verbose',
'-v',
help='Print all the steps as they happen.',
action='store_true')
if arguments is None or '-f' in arguments:
parser.add_argument(
'--force',
'-force',
help='Not used but included for compatiblity with another script.',
action='store_true')
if arguments is None or '-tp' in arguments:
parser.add_argument(
'--trace_print',
'-tp',
help='Execution trace written to stdout.',
action='store_true')
if arguments is None or '-tf' in arguments:
parser.add_argument(
'--trace_file',
'-tf',
help='Execution trace written to file.',
action='store_true')
cls.args, unknown = parser.parse_known_args()
# initialize the project singleton with passed in parameters
cls.initialize(
get_project(cls.args.json) if hasattr(cls.args, 'json') else {},
_task,
getattr(cls.args, 'instance', None),
getattr(cls.args, 'project', None),
getattr(cls.args, 'user', None),
getattr(cls.args, 'service', None),
getattr(cls.args, 'client', None),
getattr(cls.args, 'json', None),
getattr(cls.args, 'key', None),
getattr(cls.args, 'verbose', None),
getattr(cls.args, 'force', None),
getattr(cls.args, 'trace_print', None),
getattr(cls.args, 'trace_file', None)
)
@classmethod
def from_parameter(cls, **parameters):
if 'recipe' in parameters:
parameters['recipe'] = json.loads(parameters['recipe'])
else:
raise ValueError("recipe not found")
cls.initialize(
parameters.get('recipe') if parameters.get('recipe') else {},
parameters.get('task_name', None),
parameters.get('instance', None),
parameters.get('project', None),
parameters.get('user', None),
parameters.get('service', None),
parameters.get('client', None),
parameters.get('key', None),
parameters.get('verbose', None),
parameters.get('force', None),
parameters.get('trace_print', None),
parameters.get('trace_file', None)
)
@classmethod
def get_task_index(cls):
i = 0
for c, t in enumerate(cls.recipe.get('tasks', [])):
if next(iter(t.keys())) == cls.function:
i += 1
if i == cls.instance:
return c
return None
@classmethod
def get_task(cls):
#if cls.task is None:
i = cls.get_task_index()
cls.task = None if i is None else next(
iter(cls.recipe['tasks'][i].values()))
return cls.task
@classmethod
def set_task(cls, function, parameters):
if cls.task is None:
cls.recipe['tasks'].append({function: parameters})
cls.function = function
cls.task = parameters
cls.instance = 1
else:
i = cls.get_task_index()
cls.recipe['tasks'][i] = {function: parameters}
cls.function = function
cls.task = parameters
cls.instance = sum([
1 for c, t in enumerate(cls.recipe['tasks'])
if t == function and c <= i
])
@staticmethod
def from_parameters(func):
"""Initializes a project singleton for execution by a task.
Either loads parameters (recipe, instance) passed to task programatically,
or if no parameters passed attmepts to load them from the command line.
Uses decorator pattern, task name is inferred from function ebing decorated.
Args:
- recipe: (dict) JSON object representing the project ( setup plus at
least one task )
- instance: (integer) numeric offset of task to run if multiple calls to
thsi task exist
"""
def from_parameters_wrapper(recipe=None, instance=1):
if recipe:
project.initialize(
_recipe=recipe, _task=func.__name__, _instance=instance)
else:
project.from_commandline(func.__name__)
func()
return from_parameters_wrapper
@classmethod
def initialize(cls,
_recipe={},
_task=None,
_instance=1,
_project=None,
_user=None,
_service=None,
_client=None,
_key=None,
_verbose=False,
_force=False,
_trace_print=False,
_trace_file=False):
"""Used in StarThinker scripts as programmatic entry point.
Set up the project singleton for execution of a task, be sure to mimic
defaults in helper
this function loads credentials from various source ( command line argument,
json, default credentials )
it also sets up time zone aware date and various helper flags such as force
and verbose.
Usage example:
```
from util_test.project import project
if __name__ == "__main__":
user = 'user.json'
service = 'service.json'
recipe = {'setup':..., 'tasks':.... }
project.initialize(
_recipe=recipe,
_user=user,
_service=service,
_verbose=True
)
```
Args:
- _recipe: (dict) JSON object representing the project ( setup plus at
least one task )
- _task: (string) Task name form recipe json task list to execute.
- _instance: (integer) See module description.
- _project: (string) See module description.
- _user: (string) See module description.
- _service: (string) See module description.
- _client: (string) See module description.
- _key: (string) See module description.
- _verbose: (boolean) See module description.
- _force: (boolean) See module description.
- _trace_print: (boolean) True if writing execution trace to stdout.
- _trace_file: (boolean) True if writing execution trace to file ( see
config.py ).
Returns:
Nothing, this manipulates a singleton object. All calls to project.*
result in the same object.
"""
starthinker_trace_start(_trace_print, _trace_file)
cls.recipe = _recipe
cls.function = _task
cls.instance = _instance
cls.force = _force
# populates the task variable based on function and instance
cls.get_task()
cls.verbose = _verbose
# add setup to json if not provided and loads command line credentials if given
if 'setup' not in cls.recipe:
cls.recipe['setup'] = {}
if 'auth' not in cls.recipe['setup']:
cls.recipe['setup']['auth'] = {}
if _service:
cls.recipe['setup']['auth']['service'] = _service
if _client:
cls.recipe['setup']['auth']['client'] = _client
# if user explicity specified by command line
if _user:
cls.recipe['setup']['auth']['user'] = _user
# or if user not given, then try default credentials ( disabled security risk to assume on behalf of recipe )
#elif not cls.recipe['setup']['auth'].get('user'):
# cls.recipe['setup']['auth']['user'] = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', None)
# if project id given, use it
if _project:
cls.recipe['setup']['id'] = _project
if _key:
cls.recipe['setup']['key'] = _key
# TBD: if project id not given, use service credentials
#elif not cls.recipe['setup'].get('id') and cls.recipe['setup']['auth'].get('service'):
# TBD: if project id not given, use client credentials
#elif not cls.recipe['setup'].get('id') and cls.recipe['setup']['auth'].get('client'):
cls.id = cls.recipe['setup'].get('id')
cls.key = cls.recipe['setup'].get('key')
# find date based on timezone
cls.timezone = pytz.timezone(
cls.recipe['setup'].get(
'timezone',
'America/Los_Angeles'
)
)
cls.now = datetime.now(cls.timezone)
cls.date = cls.now.date()
cls.hour = cls.now.hour
if cls.verbose:
print('TASK:', _task or 'all')
print('DATE:', cls.now.date())
print('HOUR:', cls.now.hour)
@classmethod
def execute(cls, _force=False):
"""Run all the tasks in a project in one sequence.
```
from util_test.project import project
if __name__ == "__main__":
var_user = '[USER CREDENTIALS JSON STRING OR PATH]'
var_service = '[SERVICE CREDENTIALS JSON STRING OR PATH]'
var_recipe = {
"tasks":[
{ "dataset":{
"auth":"service",
"dataset":"Test_Dataset"
}}
]
}
project.initialize(
_recipe=var_recipe,
_user=var_user,
_service=var_service,
_verbose=True
)
project.execute()
```
For a full list of tasks see: scripts/*.json
"""
returncode = EXIT_SUCCESS
instances = {}
for task in cls.recipe['tasks']:
script, task = next(iter(task.items()))
# count instance per task
instances.setdefault(script, 0)
instances[script] += 1
print('RUNNING TASK:', '%s %d' % (script, instances[script]))
if _force or cls.force or is_scheduled(cls.recipe, task):
try:
python_callable = getattr(
import_module('starthinker.task.%s.run' % script),
script
)
python_callable(cls.recipe, instances[script])
except Exception as e:
print(str(e))
returncode = EXIT_ERROR
else:
print(
'Schedule Skipping: add --force to ignore schedule or run specific task handler'
)
return returncode
```
```python
kwarg['task_name'] = 'bigquery'
kwarg['instance'] = 1
kwarg['project'] = 'spidr-coreapp-dev'
kwarg['service'] = '/home/akhil/Desktop/bq-task/key.json'
```
Update [Aug - 25th]
identify Job_creation what falls under, where to write all the code methods
Inputs needed to create a job(User_creds or identifer string.json file,) Account ID, identify.json
After creation of job, how to pick recipe, Pulling recipes from topic. single or multiple
How to loop through one recipe
Have a json that have all the details, daivik will get a paayload, how
Recipe execution,
Fetching the recipe from topic, in that looping of tasks, ceate a
Refresh token from colud bucket
payload:
"tasks": recipe.tasks,
"job_status": recipe.job_status,
"job_utm": recipe.job_utm,
"worker_utm": recipe.worker_utm,
"worker_uid": recipe.worker_uid,
"active": recipe.active,
"manual": recipe.manual,
"id": recipe.id,
"ingredients": ingredients
"user_identifier":
"service_json":
Whats done is not wrong but incomplete.
# 4th OCT
```python=
# pylint:disable=logging-fstring-interpolation
import logging
from typing import List
from google.api_core.exceptions import DeadlineExceeded
from google import pubsub_v1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PubSubHandler:
@staticmethod
def receive_messages(
project_id: str, subscription_id: str, number_of_messages: int = 1000) -> List[any]:
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
logger.info(f"subscription path: {subscription_path}")
try:
# Initialize request argument(s)
request = pubsub_v1.PullRequest({"subscription": subscription_path, "max_messages": number_of_messages})
response = subscriber.pull(request=request)
received_messages = response.received_messages
except DeadlineExceeded:
received_messages = []
logger.info('No messages caused error')
return received_messages
@staticmethod
def ack_messages(project_id: str, subscription_id: str, message_ids: List[any]):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
if len(message_ids) > 0:
subscriber.acknowledge(subscription=subscription_path, ack_ids=message_ids)
logger.info(f'acknowledged msg_ids: {message_ids}')
return True
return False
```
```python=
PubSubHandler.ack_messages(project_id="", subscription_id=""], message_ids=ack_ids)
```
```python=
def pre_process_messages(pulled_messages):
messages_dict = {}
if len(pulled_messages) < 1:
logger.info('no messages in queue...')
return messages_dict
for message in pulled_messages:
raw_data = message.message.data
decoded_data = raw_data.decode('utf8')
messages_dict[message.ack_id] =decoded_data
return messages_dict
```