owned this note
owned this note
Published
Linked with GitHub
# tangle accelerator and rabbitmq c client
tangle-accelerator/accelerator/common_core.c
mux.handle("/transaction")
api_send_transfer (accelerator/apis.c)
-> ta_send_transfer (accelerator/common_core.c)
-> -> ta_send_trytes (accelerator/common_core.c)
-> -> -> ta_attach_to_tangle (accelerator/common_core.c)
-> -> -> -> ta_pow (utils/pow.c)
```
const attachToTangle = async (req, callback) => {
if (req.body.minWeightMagnitude > MWM_LIMIT) {
callback(new Error(`Error: 'minWeightMagnitude' exceeds limit of ${MWM_LIMIT}`));
return;
}
const { channel } = req.rabbit;
let messageId;
try {
messageId = await job.create(JSON.stringify(req.body));
} catch (e) {
console.error(e);
callback(e);
return;
}
await channel.assertQueue(INCOMING_QUEUE);
await channel.sendToQueue(INCOMING_QUEUE, asBuffer(req.body), {
messageId,
appId: process.env.AMQP_APP_ID
});
log.info({
queue: INCOMING_QUEUE,
name: 'job-request-ok',
jobId: messageId
});
callback(null, { jobId: messageId });
};
```
https://github.com/iotaledger/powbox/blob/ae003df500e47f4a8f59650f7056d48277a2ec69/src/server/commands.js#L15
rabbitmq-c/examples/amqp_connect_timeout.c
https://github.com/alanxz/rabbitmq-c/blob/master/examples/amqp_connect_timeout.c
```
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,
int channel_max, int frame_max, int heartbeat,
int sasl_method, ...)
```
```
docker-compose run -d -p 5672:5672 rabbitmq
```
https://github.com/iotaledger/powbox
Work Queues
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
* Message acknowledgment
* ack
* In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
* re-queue
* Message durability
* message persistence
* Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong,
* Fair dispatch
* prefetch_count=1
* it will dispatch it to the next worker that is not still busy.

https://github.com/apre/rabbitmq-c/blob/examples-tutorial/examples/new_task.c
https://github.com/apre/rabbitmq-c/blob/examples-tutorial/examples/worker.c
dcurl_entry
https://github.com/DLTcollab/tangle-accelerator/blob/1f45b35fcaaa5f1fb5e8d4e097c68fe78b4092e0/utils/pow.c#L11
* FPGA dcurl
* threads = 0
* transaction tryes
* mwm
rabbitmq-c/librabbitmq/amqp.h, https://github.com/alanxz/rabbitmq-c/blob/257d2918271e9fa3bf32170dc0d8a49ac323392f/librabbitmq/amqp.h
res = amqp_consume_message(conn, &envelope, NULL, 0);
* amqp_envelope_t envelope;
```
/**
* Envelope object
*
* \since v0.4.0
*/
typedef struct amqp_envelope_t_ {
amqp_channel_t channel; /**< channel message was delivered on */
amqp_bytes_t
consumer_tag; /**< the consumer tag the message was delivered to */
uint64_t delivery_tag; /**< the messages delivery tag */
amqp_boolean_t redelivered; /**< flag indicating whether this message is being
redelivered */
amqp_bytes_t exchange; /**< exchange this message was published to */
amqp_bytes_t
routing_key; /**< the routing key this message was published with */
amqp_message_t message; /**< the message */
} amqp_envelope_t;
```
amqp_basic_publish
```
status = amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(queueName),
0,
0,
&props,
amqp_cstring_bytes(messagebody));
```
*
amqp_bytes_t
amqp_pool_alloc
Compatibility and Conformance
https://www.rabbitmq.com/specification.html