owned this note
owned this note
Published
Linked with GitHub
## Requirement
https://github.com/bcgov/DITP/issues/23
This document outlines:
- changes in ACA-Py, leverages upon changes [per tenant settings] made for endpoint. It will add `ACAPY_ELK_HOST`, `ACAPY_ELK_PORT` and `ACAPY_ELK_LOGGER_ALIAS`. `ACAPY_ELK_LOGGER_ALIAS` is the unique human readable identifier which is associated with each tenant. Other details are included below.
- accessing and consuming scoped logs [tenant-ui]
## docker-compose ELK
- https://levelup.gitconnected.com/docker-compose-made-easy-with-elasticsearch-and-kibana-4cb4110a80dd
## Consumer [tenant-ui]
#### Dependancy
https://www.npmjs.com/package/@elastic/elasticsearch
#### Full text search
assuming `tenant_test_123` is the log alias for a particular tenant [same as`ACAPY_ELK_LOGGER_ALIAS`]
To extract `ERROR` logs for `tenant_test_123`, we send the following request:
```
GET /_search
{
"query": {
"query_string": {
"query": "(tenant_test_123) AND ([ERROR])"
}
}
}
```
## Changes in ACA-Py
#### Dependancy
`Python Logstash Async`[https://python-logstash-async.readthedocs.io/en/stable/index.html]
#### Existing code [current code]
1. Profile file [special case]
```
"""Manage Aries-Askar profile interaction."""
...
import logging
...
LOGGER = logging.getLogger(__name__)
class AskarProfile(Profile):
"""Provide access to Aries-Askar profile interaction methods."""
BACKEND_NAME = "askar"
def __init__(
self,
opened: AskarOpenStore,
context: InjectionContext = None,
*,
profile_id: str = None
):
"""Create a new AskarProfile instance."""
super().__init__(context=context, name=opened.name, created=opened.created)
self.opened = opened
self.ledger_pool: IndyVdrLedgerPool = None
self.profile_id = profile_id
self.init_ledger_pool()
self.bind_providers()
...
def init_ledger_pool(self):
"""Initialize the ledger pool."""
if self.settings.get("ledger.disabled"):
LOGGER.info("Ledger support is disabled")
return
if self.settings.get("ledger.genesis_transactions"):
pool_name = self.settings.get("ledger.pool_name", "default")
keepalive = int(self.settings.get("ledger.keepalive", 5))
read_only = bool(self.settings.get("ledger.read_only", False))
socks_proxy = self.settings.get("ledger.socks_proxy")
if read_only:
LOGGER.error("Note: setting ledger to read-only mode")
genesis_transactions = self.settings.get("ledger.genesis_transactions")
cache = self.context.injector.inject_or(BaseCache)
self.ledger_pool = IndyVdrLedgerPool(
pool_name,
keepalive=keepalive,
cache=cache,
genesis_transactions=genesis_transactions,
read_only=read_only,
socks_proxy=socks_proxy,
)
...
```
2. Dispatcher (with access to `profile`)
```
...
import logging
...
LOGGER = logging.getLogger(__name__)
class Dispatcher:
def __init__(self, profile: Profile):
"""Initialize an instance of Dispatcher."""
self.collector: Collector = None
self.profile = profile
self.task_queue: TaskQueue = None
...
def log_task(self, task: CompletedTask):
"""Log a completed task using the stats collector."""
if task.exc_info and not issubclass(task.exc_info[0], HTTPException):
# skip errors intentionally returned to HTTP clients
LOGGER.exception(
"Handler error: %s", task.ident or "", exc_info=task.exc_info
)
if self.collector:
timing = task.timing
if "queued" in timing:
self.collector.log(
"Dispatcher:queued", timing["unqueued"] - timing["queued"]
)
if task.ident:
self.collector.log(task.ident, timing["ended"] - timing["started"])
...
```
#### Updated code [with proposed changes]
1. Profile file [special case]
```
"""Manage Aries-Askar profile interaction."""
...
import logging
from logstash_async.handler import AsynchronousLogstashHandler
from logstash_async.transport import HttpTransport
...
LOG_FORMAT = "%(asctime)s - [%(levelname)s] - %(logger_alias)s - (%(filename)s).%(funcName)s(%(lineno)d) - %(message)s"
class AskarProfile(Profile):
"""Provide access to Aries-Askar profile interaction methods."""
BACKEND_NAME = "askar"
def __init__(
self,
opened: AskarOpenStore,
context: InjectionContext = None,
*,
profile_id: str = None
):
"""Create a new AskarProfile instance."""
super().__init__(context=context, name=opened.name, created=opened.created)
self.opened = opened
self.ledger_pool: IndyVdrLedgerPool = None
self.profile_id = profile_id
self.init_ledger_pool()
self.bind_providers()
if (
self.settings.get("logger.host") and
self.settings.get("logger.port") and
self.settings.get("logger.alias")
):
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
)
self.logger = logging.getLogger(__name__)
self.logger = logging.LoggerAdapter(
logger,
{"logger_alias": self.settings.get("logger.alias")},
)
transport = HttpTransport(
host,
port,
timeout=5.0,
)
self.logger.addHandler(
AsynchronousLogstashHandler(host, port, transport=transport)
)
else:
self.logger = logging.getLogger(__name__)
...
def init_ledger_pool(self):
"""Initialize the ledger pool."""
if self.settings.get("ledger.disabled"):
self.logger.info("Ledger support is disabled")
return
if self.settings.get("ledger.genesis_transactions"):
pool_name = self.settings.get("ledger.pool_name", "default")
keepalive = int(self.settings.get("ledger.keepalive", 5))
read_only = bool(self.settings.get("ledger.read_only", False))
socks_proxy = self.settings.get("ledger.socks_proxy")
if read_only:
self.logger.error("Note: setting ledger to read-only mode")
genesis_transactions = self.settings.get("ledger.genesis_transactions")
cache = self.context.injector.inject_or(BaseCache)
self.ledger_pool = IndyVdrLedgerPool(
pool_name,
keepalive=keepalive,
cache=cache,
genesis_transactions=genesis_transactions,
read_only=read_only,
socks_proxy=socks_proxy,
)
...
```
2. Dispatcher (with access to `profile`)
For some abstract classes like `aries_cloudagent/indy/verifier.py`
`profile` is only accessible at function level [`check_timestamps`], I will add a utility function for such cases. This function would accept profile as parameter and return a logger with the format and AsynchronousLogstashHandler added.
```
...
import logging
from logstash_async.handler import AsynchronousLogstashHandler
from logstash_async.transport import HttpTransport
...
class Dispatcher:
def __init__(self, profile: Profile):
"""Initialize an instance of Dispatcher."""
self.collector: Collector = None
self.profile = profile
self.task_queue: TaskQueue = None
if (
self.settings.get("logger.host") and
self.settings.get("logger.port") and
self.settings.get("logger.alias")
):
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
)
self.logger = logging.getLogger(__name__)
self.logger = logging.LoggerAdapter(
logger,
{"logger_alias": self.settings.get("logger.alias")},
)
transport = HttpTransport(
host,
port,
timeout=5.0,
)
self.logger.addHandler(
AsynchronousLogstashHandler(host, port, transport=transport)
)
else:
self.logger = logging.getLogger(__name__)
...
def log_task(self, task: CompletedTask):
"""Log a completed task using the stats collector."""
if task.exc_info and not issubclass(task.exc_info[0], HTTPException):
# skip errors intentionally returned to HTTP clients
self.logger.exception(
"Handler error: %s", task.ident or "", exc_info=task.exc_info
)
if self.collector:
timing = task.timing
if "queued" in timing:
self.collector.log(
"Dispatcher:queued", timing["unqueued"] - timing["queued"]
)
if task.ident:
self.collector.log(task.ident, timing["ended"] - timing["started"])
...
```
3. Other files (without access to `profile`)
- like `aries_cloudagent/core/protocol_registry.py`
Unable to log at tenant level and post logs to ELK.
- Posting logs to ELK [log access] can be provided but at root level. I think this can be achieved by writing logger/ELK based settings to file so that they are accessible.