# FLUSSO DEL NEW INDEX NAME
il metodo **esclient.build_insert_bulk()** viene usato nel metodo **thing_index.insert_bulk_reindex** alla **riga 336**.
def insert_bulk_reindex(self, new_things_index_name, things_document_list):
body = self.es_client.build_insert_bulk({new_things_index_name: things_document_list})
results = self.es_client.bulk(body)
return results
nel quale gli viene passato il nuovo index.
Esso viene usato in **reindex/lambdafunction.py nel metodo fill_new_indexes()** alla riga **434.**
def fill_new_indexes(new_things_index_name, document_list):
errors = []
if len(document_list) == 0:
return errors
results = things_index.insert_bulk_reindex(new_things_index_name, document_list)
for item in results['items']:
if item['index']['status'] != 201:
if item['index']['status'] != 200:
errors.append(
'ES error in index {}: {}'.format(item["index"]["_index"], item["index"]['error']['reason']))
# else:
#errors.append('Warn: Insert operation updated an object that was already present!')
return errors
il **metodo** suddetto viene poi utilizzato nel metodo **reindex** della stessa lamda alla riga **181**
def reindex(event, context):
if 'uuid_source_mapping' not in event and \
'skip_adjust_alias' not in event and \
'skip_stop_lambda_event_sources' not in event:
# stop the parsing of new events
lambda_sources = {
SQS_STREAM_ARN: DD_PARSER_LAMBDA_NAME,
KINESIS_STREAM_ARN: ES_PARSER_LAMBDA_NAME
}
uuid_source_mapping = stop_lambda_event_sources(lambda_sources)
else:
uuid_source_mapping = event.get('uuid_source_mapping')
if 'new_things_index_name' not in event:
new_things_index_name = things_index.create()
else:
new_things_index_name = event['new_things_index_name']
if 'project_ids' in event:
project_ids = event['project_ids']
logger.debug(f"Projects to scan retrieved from the event: {project_ids}")
else:
project_ids = projects_details_table.get_project_ids()
logger.debug(f"Projects to scan retrieved from DynamoDB: {project_ids}")
if DROP_LAMBDA_LOGS:
lambdas_index.drop()
n_projects_to_query = len(project_ids)
project_ids_to_pass = deepcopy(project_ids)
errors = []
things_next_token: Optional[str] = None
gateways_next_token: Optional[str] = None
# check whether there's a project to complete
if 'things_next_token' in event:
things_next_token = event['things_next_token']
del event['things_next_token']
if 'gateways_next_token' in event:
gateways_next_token = event['gateways_next_token']
del event['gateways_next_token']
is_first_cycle: bool = things_next_token is None and gateways_next_token is None
# main loop for the list of projects
for project_id in project_ids:
# main loop for current project
while is_first_cycle or things_next_token is not None or gateways_next_token is not None:
try:
# invokes the same lambda asynchronously with remaining project ids to prevent timeout
t = context.get_remaining_time_in_millis()
if t < PROCESS_TIMEOUT: # 30s is the worst case scenario for a call to IoT Core
# if no progress have been made from the last call, something is wrong if n_projects_to_query == len(
project_ids_to_pass) and things_next_token is None and gateways_next_token is None:
logger.error(
"Error during reindex, no project has been elaborated before a new asynchronous call")
return ["Stall Error"]
event['project_ids'] = project_ids_to_pass
if things_next_token is not None:
event['things_next_token'] = things_next_token
if gateways_next_token is not None:
event['gateways_next_token'] = gateways_next_token
event['uuid_source_mapping'] = uuid_source_mapping
event['new_things_index_name'] = new_things_index_name
invoke_async_lambda_response = lambda_client.invoke(
FunctionName=context.function_name,
InvocationType='Event',
Payload=json.dumps(event)
)
logger.debug(
f"Invoked requestId function"
f"{invoke_async_lambda_response['ResponseMetadata']['RequestId']} with payload: {event}"
)
return errors
project_info = projects_details_table.get_project_details_obj(project_id)
things_list = []
logger.debug(f"Scanning project: {project_info.to_json()}")
if is_first_cycle or gateways_next_token is not None:
devices = get_devices_from_project(project_info, 'gateway', next_token=gateways_next_token)
things_list.extend(devices['list'])
gateways_next_token = devices['next_token']
if is_first_cycle or things_next_token is not None:
devices = get_devices_from_project(project_info, 'thing', next_token=things_next_token)
things_list.extend(devices['list'])
things_next_token = devices['next_token']
is_first_cycle = False
errors.extend(fill_new_indexes(new_things_index_name, things_list))
except Exception as e:
logger.error("Error for project %s: %s", project_id, e)
errors.extend(["Error for project {}: {}".format(project_id, str(e))])
# skip project
break
if 'skip_align_lambda_invoke' not in event:
data_to_insert = {
'update_mode': ["status", "certificate"],
'things_list': {
project_id: ["*"]
}
}
try:
lambda_client.invoke(
FunctionName=os.environ['ALIGN_LAMBDA_NAME'],
InvocationType='Event',
Payload=json.dumps(data_to_insert)
)
logger.debug(f"Launched things alignment task for {project_id}")
except Exception as e:
logger.exception("Error invoking lambda align thing for for project %s")
# proceed with the next project
continue
project_ids_to_pass.remove(project_id)
things_next_token = None
gateways_next_token = None
is_first_cycle = True
# all projects have been processes
if 'skip_adjust_alias' not in event:
adjust_alias()
if uuid_source_mapping:
restore_lambda_event_sources(uuid_source_mapping)
return errors
**new_things_index_name** viene usato alle righe **94,95, 97 e 150**
il mio obiettivo è usare **new_things_index_name** come altro valore da passare in pancia al metodo custom da lanciare quando la reindex è attiva e abbiamo bisogno di aggiornare cancellare e aggiungere i record che ci interessano durante la reindex.