Mons
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
    • Invite by email
      Invitee

      This note has no invitees

    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Note Insights
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Versions and GitHub Sync Note Insights Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
  • Invite by email
    Invitee

    This note has no invitees

  • Publish Note

    Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

    Your note will be visible on your profile and discoverable by anyone.
    Your note is now live.
    This note is visible on your profile and discoverable online.
    Everyone on the web can find and read all notes of this public team.
    See published notes
    Unpublish note
    Please check the box to agree to the Community Guidelines.
    View profile
    Engagement control
    Commenting
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    • Everyone
    Suggest edit
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    Emoji Reply
    Enable
    Import from Dropbox Google Drive Gist Clipboard
       owned this note    owned this note      
    Published Linked with GitHub
    Subscribed
    • Any changes
      Be notified of any changes
    • Mention me
      Be notified of mention me
    • Unsubscribe
    Subscribe
    # Создаём с нуля высоконагруженное приложение на Tarantool Mons Anderson Mail.ru Cloud Solutions В 2013 я пришел в Mail.ru Group, и я решал задачу, в которой мне нужна была очередь. Есть много разных инструментов для построения очередей, но я решил для начала узнать, что уже имеется в компании. Услышал, что есть такой продукт — Tarantool. Узнал, как он устроен, и мне показалось, что в него отлично может быть встроен брокер очередей. Я пошёл к главному по Tarantool — Косте Осипову — и постарался объяснить, что я хочу получить. Предполагалось, что код очереди будет написан на C, как и остальной код Tarantool, но... На следующий день Костя дал мне [скрипт](https://github.com/mailru/tntlua/commit/f879dfb6981dc82287b7243074ca6cc9c6038369) на 250 строк, который реализовывал почти всё, что я хотел. С того момента я влюбился в Tarantool. Оказалось, что можно написать совсем немного кода на очень простом скриптовом языке и получить совершенно новую для этой СУБД функциональность. Прошло много времени, Tarantool развивался, в том числе и под влиянием наших запросов, но основные идеи и подходы сохранились. Я расскажу, как реализовать собственную очередь на современном Tarantool, например версии 2.2. <cut /> На тот момент я был знаком с несколькими реализациями очередей, и мне нравился простой и быстрый Beanstalkd. У него довольно удобный интерфейс, отслеживание состояния задачи по соединению (обрыв клиента возвращал задачу в очередь), а также удобные возможности по работе с отложенными задачами. При реализации очереди мне хотелось получить что-то подобное. Сам сервис можно представить следующим образом: у нас есть процесс брокера очереди, который принимает и хранит задачи; есть клиенты: продюсеры, которые приносят задачи (метод `put`); и консюмеры, которые берут задачи в работу (метод `take`). ```graphviz digraph test { rankdir=LR //ranksep = "1.2 equally"; ranksep = 1.3 nodesep = 0.1 subgraph cluster_p { label="Producers" graph [style=dotted]; { rank=same; p1; p2; p3; } edge [style=invis,dir=none,constraint=true, len=1]; p1 -> p2; p2 -> p3; } subgraph cluster_q { label="Broker" node [group=queue]; rank=same; q [label="queue";style=bold;shape=cylinder]; } subgraph cluster_c { label="Consumers" graph [style=dotted]; edge [style=invis,dir=none,constraint=true]; weight=20; { rank=same; node [group=cons]; c1; c2; c3; } c1 -> c2; c2 -> c3; } { edge [constraint=false]; weight=0; p1->q [constraint=true, label="put()"]; p2->q [constraint=true, label="put()"]; p3->q [constraint=true, label="put()"]; } { edge [constraint=false]; weight=0; q->c1 [constraint=true, label="take()"]; q->c2 [constraint=true, label="take()"]; q->c3 [constraint=true, label="take()"]; } } ``` Жизненный цикл одной задачи можно описать следующей схемой. Задача появляется при помощи метода `put` и переходит в состояние ready. Операция `take` переводит задачу в taken. Из taken задача может быть обработана (`ack`) и удалена, или возвращена в ready (`release`). ```graphviz digraph onenode { { rank=same; "-"; "x"; } { rank=same; ready; taken; } "-" -> ready [label=".put()"] ready -> taken [label=".take()" ] taken -> ready [label=".release()"] taken -> "x" [label=".ack()"] } ``` Также мы можем расширить эту диаграмму и ввести дополнительно отложенную обработку задач: ```graphviz digraph onenode { { rank=same; "-"; "x"; } { rank=same; ready; taken; } "-" -> ready [label=".put()",weight=10] "-" -> wait [label=".put(+delay)"] wait -> ready [label="~time"] ready -> taken [label=".take()" ] taken -> ready [label=".release()"] taken -> wait [label=".release(+delay)"] taken -> "x" [label=".ack()"] } ``` ## Подготовка окружения Tarantool сегодня — это, в том числе, LuaJIT-интерпретатор. Чтобы начать с ним работать, нужно создать стартовый файл init.lua, точку входа, и прописать там вызов `box.cfg()`, который запускает внутренности СУБД. Для локальной разработки осталось только подключить и запустить консоль. Затем создайте вот такой файл и запустите его: ```lua require'strict'.on() box.cfg{} require'console'.start() os.exit() ``` Консоль интерактивная, в ней сразу можно что-то делать. Не нужно долго и много устанавливать и настраивать инструменты, разбираться в них. Просто пишете 10-15 строчек на любой локальной машине Также я рекомендую сразу включить strict. Язык Lua довольно свободен в объявлении переменных, и этот режим должен немного помочь вам при ошибках. Кстати, если собрать Tarantool самому в режиме `DEBUG`, то `strict` будет включён по умолчанию. Далее остаётся только запустить наш файл при помощи `tarantool`: ```shell tarantool init.lua ``` Вы должны увидеть что-то похожее: ``` 2020-07-09 20:00:11.344 [30043] main/102/init.lua C> Tarantool 2.2.3-1-g98ecc909a 2020-07-09 20:00:11.345 [30043] main/102/init.lua C> log level 5 2020-07-09 20:00:11.346 [30043] main/102/init.lua I> mapping 268435456 bytes for memtx tuple arena... 2020-07-09 20:00:11.347 [30043] main/102/init.lua I> mapping 134217728 bytes for vinyl tuple arena... 2020-07-09 20:00:11.370 [30043] main/102/init.lua I> instance uuid 38c59892-263e-42de-875c-8f67539191a3 2020-07-09 20:00:11.371 [30043] main/102/init.lua I> initializing an empty data directory 2020-07-09 20:00:11.408 [30043] main/102/init.lua I> assigned id 1 to replica 38c59892-263e-42de-875c-8f67539191a3 2020-07-09 20:00:11.408 [30043] main/102/init.lua I> cluster uuid 7723bdf4-24e8-4957-bd6c-6ab502a1911c 2020-07-09 20:00:11.425 [30043] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress' 2020-07-09 20:00:11.437 [30043] snapshot/101/main I> done 2020-07-09 20:00:11.439 [30043] main/102/init.lua I> ready to accept requests 2020-07-09 20:00:11.439 [30043] main/104/checkpoint_daemon I> scheduled next checkpoint for Thu Jul 9 21:11:59 2020 tarantool> ``` ### Пишем очередь Создадим отдельный файл `queue.lua` для написания нашего приложения. Конечно, можно было бы писать всё прямо в `init.lua`, но работать с отдельным файлом будет удобнее. Подключим `queue` в виде модуля из файла `init.lua`: ```lua require'strict'.on() box.cfg{} queue = require 'queue' require'console'.start() os.exit() ``` Все дальнейшие модификации мы будем делать в `queue.lua`. Поскольку мы делаем очередь, нам понадобится где-то хранить информацию о задачах. Создадим спейс (space) — таблицу для данных. Можно создавать его без опций, но мы сразу кое-что добавим. Чтобы нормально перезапускаться, мы укажем, что спейс нужно создавать только в том случае, если он не существует (`if_not_exists`). Также в современном Tarantool можно указывать формат полей с описанием содержимого (лучше так и делать). Так мы и поступим. Под очередь возьмём совсем простую структуру. Мне понадобятся только `id` задач, их статусы и какие-то произвольные данные. Мне не важно, что там будет лежать. Работать с данными без первичного индекса нельзя, поэтому сразу создадим индекс по `id`. Проверяйте, чтобы тип поля совпадал и в формате, и в индексе. ```lua box.schema.create_space('queue',{ if_not_exists = true; }) box.space.queue:format( { { name = 'id'; type = 'number' }, { name = 'status'; type = 'string' }, { name = 'data'; type = '*' }, } ); box.space.queue:create_index('primary', { parts = { 1,'number' }; if_not_exists = true; }) ``` Объявим глобальную таблицу `queue`, которая будет нести в себе наши функции, атрибуты и методы. И для начала объявим две функции: положить задачу (`put`) и взять задачу (`take`). У задач в очереди будут состояния. Для обозначения статуса заведём отдельную таблицу со статусом. В качестве значения можно использовать числа или строки, но я люблю использовать однобуквенные значения: их можно выбрать семантически значимыми и они занимают минимум места при хранении. Для начала сделаем два статуса: `R=READY` и `T=TAKEN`. ```lua local queue = {} local STATUS = {} STATUS.READY = 'R' STATUS.TAKEN = 'T' function queue.put(...) end function queue.take(...) end return queue ``` Как сделать `put`? Очень просто. Нам нужно сгенерировать `id` и вставить данные в спейс со статусом `READY`. Есть много разных способов генерирования идентификатора, мы возьмём `clock.realtime`. Для очереди он хорош тем, что автоматически определяется порядок сообщений (но учтите, что часы могут жёстко переводиться, и в этом случае порядок задач может быть нарушен). Также, теоретически, может возникнуть ситуация, когда в очереди уже будет задача с таким же значением. Поэтому можно посмотреть, нет ли задачи с таким `id`, и в случае коллизии добавить единицу. На это уйдут микросекунды, и это крайне маловероятная ситуация, поэтому производительность не пострадает. Все аргументы функции мы просто вставляем в качестве начинки в нашу задачу: ```lua local clock = require 'clock' function gen_id() local new_id repeat new_id = clock.realtime64() until not box.space.queue:get(new_id) return new_id end function queue.put(...) local id = gen_id() return box.space.queue:insert{ id, STATUS.READY, { ... } } end ``` Как только мы написали функцию `put`, можем перезапустить Tarantool и сразу вызвать эту функцию. Видим, что задача положена в очередь, она выглядит как тапл (кортеж). В него можно класть произвольные данные и даже вложенные структуры. Таплы, в которых Tarantool хранит данные, упаковываются в MessagePack, что позволяет сохранять такие структуры. ```lua tarantool> queue.put("hello") --- - [1594325382148311477, 'R', ['hello']] ... tarantool> queue.put("my","data",1,2,3) --- - [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]] ... tarantool> queue.put({ complex = { struct = "data" }}) --- - [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]] ... ``` Всё, что мы кладём, находится в спейсе. Можно взять команды спейса и посмотреть, что там лежит. ```yaml tarantool> box.space.queue:select() --- - - [1594325382148311477, 'R', ['hello']] - [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]] - [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]] ... ``` Теперь нужно научиться брать задачи — сделаем функцию `take`. Для этого поработаем со статусом. Мы берем те задачи, которые готовы к обработке, то есть находятся в статусе `READY`. Можно было бы, конечно, пройтись по первичному ключу и найти первую готовую задачу, но в условиях нагрузки и большого количества обрабатываемых задач этот сценарий нам не подойдёт. Нужен отдельный индекс по полю статуса. Одна из основных черт Tarantool, которые отличают его от key-value баз, это возможность создавать различные индексы, почти как в реляционных базах: на разные поля, композитные, разного типа. Создадим второй индекс, в котором укажем, что первое поле — это статус. По нему и будем искать. А второе поле — это `id`. Он упорядочит по возрастанию задачи в рамках одного статуса. ```lua box.space.queue:create_index('status', { parts = { 2, 'string', 1, 'number' }; if_not_exists = true; }) ``` Возьмём встроенные функции для выборки. Есть специальный итератор, который применяется к спейсу как `pairs`. В него мы передаем часть ключа. Здесь мы сталкиваемся с составным индексом, который состоит из двух полей: ищем по первому, а упорядочиваем по второму. Говорим системе найти нам таплы, которые по первой части индекса равняются статусу `READY`. И будем их получать уже упорядоченными по второй части индекса. Если мы что-то нашли, то берём задачу, обновляем и возвращаем. Обновляем для того, чтобы никто другой, кто придет с таким же вызовом `take`, не взял её. Если задач нет, то возвращаем ничего. ```lua function queue.take() local found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if found then return box.space.queue :update( {found.id}, {{'=', 2, STATUS.TAKEN }}) end return end ``` Также стоит обратить внимание на то, что первый уровень тапла в Tarantool представлен в виде массива. Он не имеет имен, он нумерованный, поэтому при таких операциях, как `update`, до совсем недавнего времени нужно было указывать номер поля. В качестве вспомогательного элемента сделаем табличку, в которой сопоставим имя поля и номер. Для формирования этой таблицы мы можем воспользоваться уже описанным нами форматом: ```lua local F = {} for no,def in pairs(box.space.queue:format()) do F[no] = def.name F[def.name] = no end ``` Для большей наглядности можем поправить описание индексов: ```lua box.space.queue:format( { { name = 'id'; type = 'number' }, { name = 'status'; type = 'string' }, { name = 'data'; type = '*' }, } ); local F = {} for no,def in pairs(box.space.queue:format()) do F[no] = def.name F[def.name] = no end box.space.queue:create_index('primary', { parts = { F.id, 'number' }; if_not_exists = true; }) box.space.queue:create_index('status', { parts = { F.status, 'string', F.id, 'number' }; if_not_exists = true; }) ``` Теперь можно реализовать `take` целиком: ```lua function queue.take(...) for _,t in box.space.queue.index.status :pairs({ STATUS.READY },{ iterator='EQ' }) do return box.space.queue:update({t.id},{ { '=', F.status, STATUS.TAKEN } }) end return end ``` Проверим, как это работает. Положим одну задачу и вызовем `take` дважды. Если к этому моменту у нас есть данные в спейсе, можем его очистить командой `box.space.queue:truncate()`: ``` tarantool> queue.put("my","data",1,2,3) --- - [1594325927025602515, 'R', ['my', 'data', 1, 2, 3]] ... tarantool> queue.take() --- - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]] ... tarantool> queue.take() --- ... ``` Первый `take` возвращает нам ту самую задачу, которую мы положили. А когда вызовем `take` повторно, то больше ничего не вернется, потому что ready-задач (в статусе `R`) больше нет. Можем убедиться в этом, выполнив `select` из спейса: ``` tarantool> box.space.queue:select() --- - - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]] ... ``` Потребитель, который берет задачу, должен либо подтвердить её обработку, либо вернуть без обработки, если по какой-либо причине не справляется. Тогда задачу сможет взять кто-то другой. Реализуем для этого две функции: `ack` и `release`. Они принимают `id` задачи и ищут её. Если у задачи статус взятой, то мы обрабатываем её. Эти функции очень похожи. Одна удаляет обработанные задачи, другая возвращает их в статус `ready`. ```lua function queue.ack(id) local t = assert(box.space.queue:get{id},"Task not exists") if t and t.status == STATUS.TAKEN then return box.space.queue:delete{t.id} else error("Task not taken") end end function queue.release(id) local t = assert(box.space.queue:get{id},"Task not exists") if t and t.status == STATUS.TAKEN then return box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) else error("Task not taken") end end ``` Посмотрим, как это работает со всеми четырьмя функциями. Кладём две задачи и берём первую, затем освобождаем её. Она возвращается обратно в статус `R`. Второй вызов `take` берёт ту же задачу. Если мы её обработаем, она удалится. Третий вызов `take` возьмёт уже вторую задачу. Порядок соблюдается. Если задача взята, то она не выдается ещё кому-нибудь. ``` tarantool> queue.put("task 1") --- - [1594326185712343931, 'R', ['task 1']] ... tarantool> queue.put("task 2") --- - [1594326187061434882, 'R', ['task 2']] ... tarantool> task = queue.take() return task --- - [1594326185712343931, 'T', ['task 1']] ... tarantool> queue.release(task.id) --- - [1594326185712343931, 'R', ['task 1']] ... tarantool> task = queue.take() return task --- - [1594326185712343931, 'T', ['task 1']] ... tarantool> queue.ack(task.id) --- - [1594326185712343931, 'T', ['task 1']] ... tarantool> task = queue.take() return task --- - [1594326187061434882, 'T', ['task 2']] ... tarantool> queue.ack(task.id) --- - [1594326187061434882, 'T', ['task 2']] ... tarantool> task = queue.take() return task --- - null ... ``` Получилась корректно работающая очередь. Мы уже можем написать потребителя, который будет обрабатывать задачи. Но у него есть, как минимум, одна проблема. Когда мы вызываем `take`, функция сразу возвращает либо задачу, либо пустую строку. Если написать цикл обработки задач и запустить его, то работать он будет, но вхолостую, ничего не делая, просто потребляя CPU. ```lua while true do local task = queue.take() if task then -- ... end end ``` Чтобы это исправить, нам понадобится примитив «канал» (или `channel`). Он позволяет передавать сообщения. По сути, это FIFO-очередь для общения между файберами. У нас есть файбер, который кладет задачи, когда мы приходим в базу данных по сети или работаем с ней из консоли. В файбере исполняется наш Lua-код, ему нужно через какой-то примитив сообщить другому файберу, который ждёт задачи, что появилась новая. Канал работает так: в нем может быть буфер на N слотов, в которые можно положить сообщение, даже если никто не читает из канала. Также можно создать канал без буферной ёмкости, тогда положить можно будет только в те слоты, которые кто-то ждет. Например, мы создаем канал на два буферных элемента. В нем два слота под `put`. Если на канале будет ожидать один потребитель, он создаст третий слот под `put`. Если мы будем класть сообщения в этот канал, то три операции `put` выполнятся без блокировки, а четвертый `put` заблокирует нам тот файбер, который кладет в этот канал. Это позволяет организовать межфайберное взаимодействие. Если вдруг вы знакомы с каналами в Go, то там они фактически такие же: ![](https://hb.bizmrg.com/trains/appdev/channel2.svg) Немного переделаем нашу функцию `take`. Сначала добавим новый аргумент — таймаут: мы готовы ждать задачу в течение определённого времени. Сделаем цикл, который будет искать готовую задачу. Если не найдёт, то будет вычислять, сколько времени ему осталось ждать. Создадим канал, который будет ждать с этим таймаутом. Ели файбер «спит» в ожидании на канале, то его можно разбудить извне, передав сообщение в этот канал. ```lua local fiber = require 'fiber' queue._wait = fiber.channel() function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) end ``` Итого: `take` пытается взять задачу, если получилось, то возвращает её. Но если задачи не нашлось, то можно подождать в течение остатка таймаута. Причем другая сторона, которая будет производить задачу, может этот файбер разбудить. Чтобы удобно было проводить различные тесты, можем в файле `init.lua` подключить модуль `fiber` глобально: ```lua fiber = require 'fiber' ``` Давайте посмотрим, как это будет работать без пробуждения файбера. В отдельном файбере положим задачу через 0,1 с. То есть сначала очередь пустая, а через 0,1 с. после запуска появляется задача. При этом вызов `take` сделаем с таймаутом 3. После запуска `take` попытается найти задачу. Не найдя её, он уснёт на 3 с. Затем проснётся, снова поищет и найдёт задачу. ```lua tarantool> do box.space.queue:truncate() fiber.create(function() fiber.sleep(0.1) queue.put("task 3") end) local start = fiber.time() return queue.take(3), { wait = fiber.time() - start } end --- - [1594326905489650533, 'T', ['task 3']] - wait: 3.0017817020416 ... ``` Теперь сделаем так, чтобы `take` просыпался при появлении задачи. Для этого возьмем старую функцию `put` и добавим в нее отправку сообщения в канал. В качестве сообщения можно отправить что угодно, пусть в этом случае будет `true`. Ранее я показывал, что `put` может заблокироваться, если в канале недостаточно места. При этом производителю задач не важно, есть с той стороны потребители или нет. Он не должен блокироваться в ожидании потребителя. Поэтому логично поставить здесь нулевой таймаут на блокировку. Если там есть потребители, то есть те, кому нужно сообщить о новой задаче, мы его разбудим. Иначе у нас сообщение в этот канал не положится. Или, в качестве альтернативного варианта, можно проверить, есть ли у канала активные читатели. ```lua function queue.put(...) local id = gen_id() if queue._wait:has_readers() then queue._wait:put(true,0) end return box.space.queue:insert{ id, STATUS.READY, { ... } } end ``` После этого тот же самый код `take` начнет работать совершенно иначе. Мы создаём задачу через 0,1 с. и `take` сразу же просыпается и получает её. Мы избавились от горячего цикла, который непрерывно висел в ожидании задачи. Если мы не положим задачу, то файбер будет ждать три секунды. ```lua tarantool> do box.space.queue:truncate() fiber.create(function() fiber.sleep(0.1) queue.put("task 4") end) local start = fiber.time() return queue.take(3), { wait = fiber.time() - start } end --- - [1594327004302379957, 'T', ['task 4']] - wait: 0.10164666175842 ... ``` На текущий момент мы протестировали работу внутри экземпляра, теперь поработаем по сети. В первую очередь нам нужно наш сервер сделать сервером. Добавим в файле `init.lua` в `box.cfg` опцию `listen` — порт, на котором он будет слушать. Вместе с этим нам понадобится сделать разрешения. Не будем сейчас детально рассматривать настройку привилегий, сделаем так, чтобы любое подключение имело привилегии на исполнение. Про права вы можете почитать [отдельно](https://www.tarantool.io/en/doc/latest/book/box/authentication/). ```lua require'strict'.on() fiber = require 'fiber' box.cfg{ listen = '127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) queue = require 'queue' require'console'.start() os.exit() ``` Создадим клиент-`producer` для генерирования задач. В поставке Tarantool уже есть модуль, который позволяет подключаться к другому Tarantool. ```lua #!/usr/bin/env tarantool if #arg < 1 then error("Need arguments",0) end local netbox = require 'net.box' local conn = netbox.connect('127.0.0.1:3301') local yaml = require 'yaml' local res = conn:call('queue.put',{unpack(arg)}) print(yaml.encode(res)) conn:close() ``` ```shell $ tarantool producer.lua "hi" --- [1594327270675788959, 'R', ['hi']] ... ``` Потребитель (`consumer`) будет подключаться, вызывать `take` с таймаутом и обрабатывать результат. Если он получил задачу, то будем печатать её и освобождать. Мы сейчас пока не будем обрабатывать. Допустим, задача получена. ```lua #!/usr/bin/env tarantool local netbox = require 'net.box' local conn = netbox.connect('127.0.0.1:3301') local yaml = require 'yaml' while true do local task = conn:call('queue.take', { 1 }) if task then print("Got task: ", yaml.encode(task)) conn:call('queue.release', { task.id }) else print "No more tasks" end end ``` Но при попытке освободить задачу у нас произойдёт какая-то фигня. ```shell $ tarantool consumer.lua Got task: --- [1594327270675788959, 'T', ['hi']] ... ER_EXACT_MATCH: Invalid key part count in an exact match (expected 1, got 0) ``` Давайте разберёмся. При повторной попытке исполнения потребителя мы обнаружим, что при предыдущем запуске он задачу взял, но не смог вернуть: у него произошла ошибка и задача застряла. Такие задачи больше никто не сможет взять, но и некому их вернуть, потому что код, который их брал, завершился. ```shell $ tarantool consumer.lua No more tasks No more tasks ``` С помощью `select` можно увидеть, что задачи взяты. ``` tarantool> box.space.queue:select() --- - - [1594327004302379957, 'T', ['task 3']] - [1594327270675788959, 'T', ['hi']] ... ``` Здесь есть сразу несколько проблем, поэтому давайте начнём с автоматического освобождения задач при отключении клиента. В Tarantool есть триггеры на подключение и отключение клиентов. Если мы их добавим, то сможем о фактах подключения и отключения. ```lua local log = require 'log' box.session.on_connect(function() log.info( "connected %s from %s", box.session.id(), box.session.peer() ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.peer() ) end) ``` ``` 2020-07-09 20:52:09.107 [32604] main/115/main I> connected 2 from 127.0.0.1:36652 2020-07-09 20:52:10.260 [32604] main/116/main I> disconnected 2 from nil 2020-07-09 20:52:10.823 [32604] main/116/main I> connected 3 from 127.0.0.1:36654 2020-07-09 20:52:11.541 [32604] main/115/main I> disconnected 3 from nil ``` Есть понятие `session id`, и можно узнать, с какого IP было подключение и время отключения. Правда, есть один нюанс. Вызов `session.peer()` по сути вызывает `getpeername(2)` непосредственно на сокете. Поэтому при отключении мы уже не видим, кто отключается (`getpeername` вызывается на закрытом сокете). Cделаем небольшой хак. В Tarantool есть `box.session.storage` — временная таблица, в которую можно сохранять всё, что хочется, на время существования сессии. Во время подключения запомним, кто к нам подключился, чтобы знать, кто отключится. Это облегчает отладку. ```lua box.session.on_connect(function() box.session.storage.peer = box.session.peer() log.info( "connected %s from %s", box.session.id(), box.session.storage.peer ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) end) ``` Теперь у нас есть событие отключения клиента. Нам нужно как-то освободить взятые им задачи. Введем понятие «владения задачей». Та сессия, которая взяла задачу, должна за неё отвечать. Заведем две таблички, в которые будем сохранять эти данные, и модифицируем функцию `take`. ```lua queue.taken = {}; -- список взятых задач queue.bysid = {}; -- список задач для конкретной сессии ``` ```lua function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end local sid = box.session.id() log.info("Register %s by %s", found.id, sid) queue.taken[ found.id ] = sid queue.bysid[ sid ] = queue.bysid[ sid ] or {} queue.bysid[ sid ][ found.id ] = true return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) end ``` Мы в ней запомним, что конкретная задача взята конкретной сессией. Также нам понадобится модифицировать код возврата задач, `ack` и `release`. Сделаем одну общую функцию. Будем проверять, что задача есть и взята, причём взята конкретной сессией. Таким образом нельзя будет из одного соединения взять задачу, а из другого прийти и сказать: «удалите её, я обработал». ```lua local function get_task( id ) if not id then error("Task id required", 2) end local t = box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[id] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[id] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[id], box.session.id() ), 2) end return t end ``` Теперь функции `ack` и `release` становятся очень простыми. Мы в них вызываем `get_task`, который проверяет, что задача принадлежит нам и взята. И дальше уже с ней работаем. ```lua function queue.ack(id) local t = get_task(id) queue.taken[ t.id ] = nil queue.bysid[ box.session.id() ][ t.id ] = nil return box.space.queue:delete{t.id} end function queue.release(id) local t = get_task(id) if queue._wait:has_readers() then queue._wait:put(true,0) end queue.taken[ t.id ] = nil queue.bysid[ box.session.id() ][ t.id ] = nil return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) end ``` Для сброса состояния всех задач в `R` можно воспользоваться SQL или Lua-cниппетом: ``` box.execute[[ update "queue" set "status" = 'R' where "status" = 'T' ]] box.space.queue.index.status:pairs({'T'}):each(function(t) box.space.queue:update({t.id},{{'=',2,'R'}}) end) ``` Когда мы вызовем `consumer` повторно, он ответит `task ID required`. ``` $ tarantool consumer.lua Got task: --- [1594327004302379957, 'T', ['task 3']] ... ER_PROC_LUA: queue.lua:113: Task id required ``` Так мы находим первую проблему в нашем коде. Когда мы работаем внутри Tarantool, кортеж всегда ассоциирован со спейсом. У того есть формат, а у формата есть имена полей. Поэтому в тапле можно пользоваться именами полей. А когда мы выносим это за пределы базы, тапл становится просто массивом с набором полей. Доработаем формат возврата из функций и будем возвращать не таплы, а объекты с именами. Для этого воспользуемся методом `:tomap{ names_only = true }`: ```lua function queue.put(...) --- ... return box.space.queue :insert{ id, STATUS.READY, { ... } } :tomap{ names_only = true } end function queue.take(timeout) --- ... return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only = true } end function queue.ack(id) --- ... return box.space.queue:delete{t.id}:tomap{ names_only = true } end function queue.release(id) --- ... return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) :tomap{ names_only = true } end return queue ``` Поменяв это, мы столкнёмся с новой проблемой. ``` $ tarantool consumer.lua Got task: --- {'status': 'T', 'data': ['hi'], 'id': 1594327270675788959} ... ER_PROC_LUA: queue.lua:117: Task 1594327270675788959ULL not taken by anybody ``` При попытке освободить задачу система ответит, мы её не брали. При этом визуально мы увидим, что ID один и тот же. Только есть еще какая-то суффикс ULL. Здесь мы сталкиваемся с одной особенностью расширения LuaJIT: FFI (Foreign Function Interface). Давайте рассмотрим детальнее. Положим в таблицу пять значений, используя в качестве ключей различные варианты записи числа `1`. ``` tarantool> t = {} tarantool> t[1] = 1 tarantool> t["1"] = 2 tarantool> t[1LL] = 3 tarantool> t[1ULL] = 4 tarantool> t[1ULL] = 5 tarantool> t --- - 1: 1 1: 5 1: 4 '1': 2 1: 3 ... ``` Можно было бы предположить, что они положатся как `2` (строка + число). Максимум как `3` (строка, число, LL). Но при выводе на экран окажется, что все ключи лежат в таблице раздельно: мы видим все значения `1`, `2`, `3`, `4`, `5`. Более того, при сериализации мы не видим разницы между обычными, знаковыми и беззнаковыми числами. ``` tarantool> return t[1], t['1'], t[1LL], t[1ULL] --- - 1 - 2 - null - null ... ``` Но самое веселье наступает, если попытаться достать данные из таблицы. С обычными Lua-типами всё хорошо (`number` и `string`), а вот с LL (`long long`) и ULL (`unsigned long long`) — нет. Эти типы являются отдельным типом `cdata`. Он предназначен для работы с типами из языка C. И при сохранении в Lua-таблицу `cdata` хэшируется по адресу, а не по значению. У двух, пусть и одинаковых по значению, чисел просто два разных адреса. И когда мы складываем ULL в таблицу, то потом не можем по такому же значению достать его из таблицы. Поэтому нам придется немного переделать нашу очередь и владение ключами. Шаг вынужденный, но он позволяет нам в дальнейшем модифицировать наши ключи произвольным образом. Нам нужно каким-либо образом превратить наш ключ в строку или число. Возьмем MessagePack. В Tarantool он используется для хранения таплов и будет упаковывать наши значения так же, как это делает сам Tarantool. С его помощью мы превратим произвольный ключ в строку, которая будет являться ключом в нашей таблице. ```lua local msgpack = require 'msgpack' local function keypack( key ) return msgpack.encode( key ) end local function keyunpack( data ) return msgpack.decode( data ) end ``` Добавляем в `take` упаковку ключа и сохраняем его в таблице. В функции `get_task` проверим, что ключ прошел в правильном формате, и если это не так, то превратим его в `int64`. После этого воспользуемся тем же самым `keypack`, который упакует ключ в MessagePack. Поскольку этот упакованный ключ будет требоваться всем функциям, которые с ним работают, мы будем возвращать его из `get_task`, чтобы `ack` и `release` могли им пользоваться и вычищать его из сессий. ```lua function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end local sid = box.session.id() log.info("Register %s by %s", found.id, sid) local key = keypack( found.id ) queue.taken[ key ] = sid queue.bysid[ sid ] = queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ] = true return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only = true } end local function get_task( id ) if not id then error("Task id required", 2) end id = tonumber64(id) local key = keypack(id) local t = box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[key] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[key], box.session.id() ), 2) end return t, key end function queue.ack(id) local t, key = get_task(id) queue.taken[ key ] = nil queue.bysid[ box.session.id() ][ key ] = nil return box.space.queue:delete{t.id}:tomap{ names_only = true } end function queue.release(id) local t, key = get_task(id) queue.taken[ key ] = nil queue.bysid[ box.session.id() ][ key ] = nil if queue._wait:has_readers() then queue._wait:put(true,0) end return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) :tomap{ names_only = true } end ``` Поскольку у нас есть есть триггер на отключение, мы теперь знаем, что отключилась конкретная сессия, которая владеет какими-то ключами. Можно взять все ключи этой сессии и автоматически вернуть их в исходное состояние — `ready`. Также, внутри этой сессии могут висеть ожидающие `take`. Оставим для них маркер в `session.storage`, что задачи брать не нужно. ```lua box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) box.session.storage.destroyed = true local sid = box.session.id() local bysid = queue.bysid[ sid ] if bysid then while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key] = nil bysid[key] = nil local t = box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ] = nil end end) function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end if box.session.storage.destroyed then return end local sid = box.session.id() log.info("Register %s by %s", found.id, sid) local key = keypack( found.id ) queue.taken[ key ] = sid queue.bysid[ sid ] = queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ] = found.id return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only = true } end ``` Для теста можно брать задачи командой: ``` tarantoolctl connect 127.0.0.1:3301 <<< 'queue.take()' ``` Пока это всё отлаживалось, можно было столкнуться с тем, что вы взяли задачи, потушили очередь, запустили заново - задачи никому не принадлежат (потому что соединения порвались при выключении), но при этом они в статусе `taken`. Поэтому добавим в код модификацию статусов при старте: база запускается и освобождает все взятые задачи. ```lua while true do local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end ``` Получилась очередь, готовая к эксплуатации. ### Добавим отложенную обработку Осталось добавить отложенные задачи. Для этого добавим новое поле и индекс по нему. В этом поле мы будем хранить время, когда определённую задачу нужно перевести в другое состояние. Модифицируем функцию `put` и добавим новый статус:` W=WAITING`. ```lua box.space.queue:format( { { name = 'id'; type = 'number' }, { name = 'status'; type = 'string' }, { name = 'runat'; type = 'number' }, { name = 'data'; type = '*' }, } ) box.space.queue:create_index('runat', { parts = { F.runat, 'number', F.id, 'number' }; if_not_exists = true; }) STATUS.WAITING = 'W' ``` Поскольку мы кардинально меняем схему и это режим разработки, очистим предыдущую схему (выполняем в консоли): ```lua box.space.queue.drop() box.snapshot() ``` Перезапустим очередь. В `put` и `release` добавим поддержку `delay`. Если `delay` передан, то присваиваем задаче состояние `WAITING` и определяем, в какой момент времени она должна быть обработана. Также нам понадобится обработчик. Для этого мы можем воспользоваться фоновыми файберами. В любой момент можно создать файбер, не ассоциированный ни с каким соединением и который будет работать в фоне. Создадим файбер, который будет крутиться бесконечно и ждать ближайшей задачи. ```lua function queue.put(data, opts) local id = gen_id() local runat = 0 local status = STATUS.READY if opts and opts.delay then runat = clock.realtime() + tonumber(opts.delay) status = STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :insert{ id, status, runat, data } :tomap{ names_only=true } end function queue.release(id, opts) local t, key = get_task(id) queue.taken[ key ] = nil queue.bysid[ box.session.id() ][ key ] = nil local runat = 0 local status = STATUS.READY if opts and opts.delay then runat = clock.realtime() + tonumber(opts.delay) status = STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :update({t.id},{{ '=', F.status, status },{ '=', F.runat, runat }}) :tomap{ names_only = true } end ``` Если приходит время какой-то задачи, мы её модифицируем. Переводим из статуса ожидания в статус готовности, также нотифицируя тех клиентов, которые могут ждать задачу. Теперь кладём задачу с задержкой. Вызываем `take`, готовой задачи нет. Вызываем повторно, уже с таймаутом, который укладывается в появление задачи. Как только она появляется, мы видим, что это заслуга файбера `queue.runat`. ```lua queue._runat = fiber.create(function() fiber.name('queue.runat') while true do local remaining local now = clock.realtime() for _,t in box.space.queue.index.runat :pairs( { 0 }, { iterator = 'GT' }) do if t.runat > now then remaining = t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { {'=', F.status, STATUS.READY }, {'=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining = 1 end fiber.sleep(remaining) end end) ``` ### Мониторинг Нельзя забывать про мониторинг очереди, потому что самое печальное, что может случиться с вашей очередью: она станет очень большой. Или вообще закончится. Мы довольно просто можем наивно и в лоб собрать статистику с нашей очереди: посчитать количество по всем статусам, которые есть, и начать отправлять данные в мониторинг. ```lua function queue.stats() return { total = box.space.queue:len(), ready = box.space.queue.index.status:count({STATUS.READY}), waiting = box.space.queue.index.status:count({STATUS.WAITING}), taken = box.space.queue.index.status:count({STATUS.TAKEN}), } end ``` ``` tarantool> queue.stats() --- - ready: 10 taken: 2 waiting: 5 total: 17 ... tarantool> local clock = require 'clock' local s = clock.time() local r = queue.stats() return r, clock.time() - s --- - ready: 10 taken: 2 waiting: 5 total: 17 - 0.00057339668273926 ... ``` Такой мониторинг будет работать довольно быстро. До того момента, пока задач не станет очень много. Нормальное состояние очереди — пустое. Но предположим, что-то случается и прилетает, например, миллион задач. Наша функция `stats` продолжает показывать корректное значение. Правда, она начинает работать довольно медленно. Проблема в вызове `index:count` — это всегда full-scan по индексу. Давайте закэшируем значения счетчиков. ```lua queue._stats = {} for k,v in pairs(STATUS) do queue._stats[v] = 0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1 end function queue.stats() return { total = box.space.queue:len(), ready = queue._stats[ STATUS.READY ], waiting = queue._stats[ STATUS.WAITING ], taken = queue._stats[ STATUS.TAKEN ], } end ``` Теперь эта функция начнет работать очень быстро, независимо от количества записей. Осталось обновлять счетчики при любых операциях. При каждой операции мы должны одно значение уменьшить, другое увеличить. Можно, конечно, вручную расставить апдейты по функциям, но это чревато ошибками и расхождениями. К счастью в Tarantool есть триггеры на спейсах. Триггер видит любое изменение в спейсе. Можно даже вручную выполнить `space:update` или `space:delete`, триггер это учтет и посчитает. Триггер будет учитывать все статусы именно по значению, по которому они хранятся в базе. При рестарте мы единоразово подсчитаем значения всех счётчиков. ```lua box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1 end end) ``` Осталась ещё одна операция, которую нельзя отловить непосредственно в спейсе, но которая влияет на его содержимое: `space:truncate()`. Отследить очистку спейса можно при помощи отдельного триггера в спейсе — `_truncate`. ```lua box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k] = 0LL end end end) ``` После этого всё начинает работать точно и консистентно. И теперь мы можем, например, отправлять статистику по сети. Вообще в Tarantool есть удобные неблокирующие сокеты. Работать с ними можно достаточно низкоуровнево, почти как в C. Для демонстрации мы можем сделать отправку метрик в формате graphite по UDP: ```lua local socket = require 'socket' local errno = require 'errno' local graphite_host = '127.0.0.1' local graphite_port = 2003 local ai = socket.getaddrinfo(graphite_host, graphite_port, 1, { type = 'SOCK_DGRAM' }) local addr,port for _,info in pairs(ai) do addr,port = info.host,info.port break end if not addr then error("Failed to resolve host") end queue._monitor = fiber.create(function() fiber.name('queue.monitor') fiber.yield() local remote = socket('AF_INET', 'SOCK_DGRAM', 'udp') while true do for k,v in pairs(queue.stats()) do local msg = string.format("queue.stats.%s %s %s\n", k, tonumber(v), math.floor(fiber.time())) local res = remote:sendto(addr, port, msg) if not res then log.error("Failed to send: %s", errno.strerror(errno())) end end fiber.sleep(1) end end) ``` или по TCP: ```lua local socket = require 'socket' local errno = require 'errno' local graphite_host = '127.0.0.1' local graphite_port = 2003 queue._monitor = fiber.create(function() fiber.name('queue.monitor') fiber.yield() while true do local remote = require 'socket'.tcp_connect(graphite_host, graphite_port) if not remote then log.error("Failed to connect to graphite %s",errno.strerror()) fiber.sleep(1) else while true do local data = {} for k,v in pairs(queue.stats()) do table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time()))) end data = table.concat(data,'') if not remote:send(data) then log.error("%s",errno.strerror()) break end fiber.sleep(1) end end end end) ``` ### Горячая перезагрузка кода И наконец, нельзя не упомянуть такую важную возможность платформы Tarantool, как горячая перезагрузка кода. В обычных приложениях такая функциональность не столь востребована, но когда у вас БД на гигабайты в памяти и любой перезапуск будет стоить вам времени запуска, это может сослужить отличную службу. Давайте рассмотрим, что необходимо сделать для горячей перезагрузки кода. Когда Lua загружает какой-то код через `require`, содержимое этого файла интерпретируется и возвращённый результат кешируется в системной таблице `package.loaded` под именем модуля. Последующие вызовы `require` того же самого модуля не будут повторно читать файл, а будут возвращать закешированное значение. Чтобы заставить Lua повторно прочитать файл и загрузить его, достаточно просто стереть запись из `package.loaded[...]` и повторно вызвать `require`. Также мы должны запомнить то, что предзагружено самим рантаймом, потому что файлов для перезагрузки встроенных модулей не будет. Простейший сниппет кода для обработки релоада может выглядеть как-то так: ```lua require'strict'.on() fiber = require 'fiber' box.cfg{ listen = '127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) local not_first_run = rawget(_G,'_NOT_FIRST_RUN') _NOT_FIRST_RUN = true if not_first_run then for k,v in pairs(package.loaded) do if not preloaded[k] then package.loaded[k] = nil end end else preloaded = {} for k,v in pairs(package.loaded) do preloaded[k] = true end end queue = require 'queue' require'console'.start() os.exit() ``` Поскольку релоад кода является довольно типичной и регулярной задачей, у нас уже есть готовый модуль [package.reload](https://github.com/moonlibs/package-reload), которым мы пользуемся в подавляющем большинстве приложений. Он сам запоминает, из какого файла всё было загружено, какие модули были предзагружены, и предоставляет удобный вызов для инициации перезагрузки: `package.reload()`. ```lua require'strict'.on() fiber = require 'fiber' box.cfg{ listen = '127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) require 'package.reload' queue = require 'queue' require'console'.start() os.exit() ``` Чтобы код можно было перезагружать, нужно писать его немного по-другому. Учитывайте, что код может исполняться повторно. Первый раз он исполняется при первой загрузке, остальные разы — при перезагрузке. Соответственно, нам необходимо явно обрабатывать эту ситуацию. ```lua local queue = {} local old = rawget(_G,'queue') if old then queue.taken = old.taken queue.bysid = old.bysid queue._triggers = old._triggers queue._stats = old._stats queue._wait = old._wait queue._runch = old._runch queue._runat = old._runat else queue.taken = {} queue.bysid = {} queue._triggers = {} queue._stats = {} queue._wait = fiber.channel() queue._runch = fiber.cond() while true do local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end for k,v in pairs(STATUS) do queue._stats[v] = 0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1 end log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats }) end ``` Также учитывайте перезагрузку триггеров. Если оставить как было, то каждая перезагрузка будет порождать установку дополнительного триггера. Но триггеры поддерживают указание старой функции, при этом установка триггера возвращает её. Поэтому просто будем сохранять результат установки в переменную и передавать её в качестве аргумента. При первом запуске переменной не будет и установится новый триггер. При последующих загрузках триггер будет заменяться. ```lua queue._triggers.on_replace = box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1 end end, queue._triggers.on_replace) queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k] = 0LL end end end, queue._triggers.on_truncate) queue._triggers.on_connect = box.session.on_connect(function() box.session.storage.peer = box.session.peer() log.info( "connected %s from %s", box.session.id(), box.session.storage.peer ) end, queue._triggers.on_connect) queue._triggers.on_disconnect = box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) box.session.storage.destroyed = true local sid = box.session.id() local bysid = queue.bysid[ sid ] if bysid then while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key] = nil bysid[key] = nil local t = box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ] = nil end end, queue._triggers.on_disconnect) ``` Ещё один важный элемент перезагрузки — это файберы. Файбер запускается в фоне, мы его никак не контролируем. В нём написано `while ... true`, он никогда не завершится и сам по себе не перезагрузится. Для того, чтобы взаимодействовать с ним, нам понадобится канал, а ещё лучше `fiber.cond`: condition variable, предназначенная для передачи сигналов файберам. Есть несколько различных подходов к перезагрузке файберов. Например, можно уничтожать старые при помощи вызова `fiber.kill`, но такой подход не очень консистентен: мы можем вызвать `kill` в неподходящий момент. Поэтому в большинстве случаев мы пользуемся признаком поколения файбера: файбер продолжает свою работу только в том поколении, в котором он был создан. При перезагрузке кода поколение меняется и файбер чисто завершается. Также мы можем защититься от одновременной работы нескольких файберов: для этого мы можем смотреть на статус файбера предыдущего поколения. ```lua queue._runat = fiber.create(function(queue, gen, old_fiber) fiber.name('queue.runat.'..gen) while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do log.info("Waiting for old to die") queue._runch:wait(0.1) end log.info("Started...") while package.reload.count == gen do local remaining local now = clock.realtime() for _,t in box.space.queue.index.runat :pairs( {0}, { iterator = 'GT' }) do if t.runat > now then remaining = t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { { '=', F.status, STATUS.READY }, { '=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining = 1 end queue._runch:wait(remaining) end queue._runch:broadcast() log.info("Finished") end, queue, package.reload.count, queue._runat) queue._runch:broadcast() ``` И напоследок: при перезагрузке кода у вас будет ошибка, что консоль уже запущена. Обработать эту ситуацию можно следующим способом: ```lua if not fiber.self().storage.console then require'console'.start() os.exit() end ``` ### Подведем итог Мы написали работающую сетевую очередь с возможностью отложенной обработки, с автовозвратом задач при помощи триггеров, с отправкой статистики в Graphite по TCP, и рассмотрели довольно много нюансов. На среднестатистическом современном железе такая очередь без проблем выдержит передачу от 20 тыс. сообщений в секунду. Она состоит примерно из 300 строк кода и пишется за день с изучением документации. :::spoiler Результирующие файлы `queue.lua`: ```lua local clock = require 'clock' local errno = require 'errno' local fiber = require 'fiber' local log = require 'log' local msgpack = require 'msgpack' local socket = require 'socket' box.schema.create_space('queue',{ if_not_exists = true; }) box.space.queue:format( { { name = 'id'; type = 'number' }, { name = 'status'; type = 'string' }, { name = 'runat'; type = 'number' }, { name = 'data'; type = '*' }, } ); local F = {} for no,def in pairs(box.space.queue:format()) do F[no] = def.name F[def.name] = no end box.space.queue:create_index('primary', { parts = { F.id, 'number' }; if_not_exists = true; }) box.space.queue:create_index('status', { parts = { F.status, 'string', F.id, 'number' }; if_not_exists = true; }) box.space.queue:create_index('runat', { parts = { F.runat, 'number', F.id, 'number' }; if_not_exists = true; }) local STATUS = {} STATUS.READY = 'R' STATUS.TAKEN = 'T' STATUS.WAITING = 'W' local queue = {} local old = rawget(_G,'queue') if old then queue.taken = old.taken queue.bysid = old.bysid queue._triggers = old._triggers queue._stats = old._stats queue._wait = old._wait queue._runch = old._runch queue._runat = old._runat else queue.taken = {} queue.bysid = {} queue._triggers = {} queue._stats = {} queue._wait = fiber.channel() queue._runch = fiber.cond() while true do local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end for k,v in pairs(STATUS) do queue._stats[v] = 0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1 end log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats }) end local function gen_id() local new_id repeat new_id = clock.realtime64() until not box.space.queue:get(new_id) return new_id end local function keypack( key ) return msgpack.encode( key ) end local function keyunpack( data ) return msgpack.decode( data ) end queue._triggers.on_replace = box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1 end end, queue._triggers.on_replace) queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k] = 0LL end end end, queue._triggers.on_truncate) queue._triggers.on_connect = box.session.on_connect(function() box.session.storage.peer = box.session.peer() end, queue._triggers.on_connect) queue._triggers.on_disconnect = box.session.on_disconnect(function() box.session.storage.destroyed = true local sid = box.session.id() local bysid = queue.bysid[ sid ] if bysid then log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key] = nil bysid[key] = nil local t = box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ] = nil end end, queue._triggers.on_disconnect) queue._runat = fiber.create(function(queue, gen, old_fiber) fiber.name('queue.runat.'..gen) while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do log.info("Waiting for old to die") queue._runch:wait(0.1) end log.info("Started...") while package.reload.count == gen do local remaining local now = clock.realtime() for _,t in box.space.queue.index.runat :pairs( {0}, { iterator = 'GT' }) do if t.runat > now then remaining = t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { { '=', F.status, STATUS.READY }, { '=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining = 1 end queue._runch:wait(remaining) end queue._runch:broadcast() log.info("Finished") end, queue, package.reload.count, queue._runat) queue._runch:broadcast() local graphite_host = '127.0.0.1' local graphite_port = 2003 queue._monitor = fiber.create(function(gen) fiber.name('queue.mon.'..gen) fiber.yield() while package.reload.count == gen do local remote = require 'socket'.tcp_connect(graphite_host, graphite_port) if not remote then log.error("Failed to connect to graphite %s",errno.strerror()) fiber.sleep(1) else while package.reload.count == gen do local data = {} for k,v in pairs(queue.stats()) do table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time()))) end data = table.concat(data,'') if not remote:send(data) then log.error("%s",errno.strerror()) break end fiber.sleep(1) end end end end, package.reload.count) function queue.put(data, opts) local id = gen_id() local runat = 0 local status = STATUS.READY if opts and opts.delay then runat = clock.realtime() + tonumber(opts.delay) status = STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :insert{ id, status, runat, data } :tomap{ names_only=true } end function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end if box.session.storage.destroyed then return end local sid = box.session.id() log.info("Register %s by %s", found.id, sid) local key = keypack( found.id ) queue.taken[ key ] = sid queue.bysid[ sid ] = queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ] = found.id return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only = true } end local function get_task( id ) if not id then error("Task id required", 2) end id = tonumber64(id) local key = keypack(id) local t = box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[key] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[key], box.session.id() ), 2) end return t, key end function queue.ack(id) local t, key = get_task(id) queue.taken[ key ] = nil queue.bysid[ box.session.id() ][ key ] = nil return box.space.queue:delete{t.id}:tomap{ names_only = true } end function queue.release(id, opts) local t, key = get_task(id) queue.taken[ key ] = nil queue.bysid[ box.session.id() ][ key ] = nil local runat = 0 local status = STATUS.READY if opts and opts.delay then runat = clock.realtime() + tonumber(opts.delay) status = STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :update({t.id},{{'=', F.status, status },{ '=', F.runat, runat }}) :tomap{ names_only = true } end function queue.stats() return { total = box.space.queue:len(), ready = queue._stats[ STATUS.READY ], waiting = queue._stats[ STATUS.WAITING ], taken = queue._stats[ STATUS.TAKEN ], } end return queue ``` `init.lua`: ```lua require'strict'.on() fiber = require 'fiber' require 'package.reload' box.cfg{ listen = '127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true }) queue = require 'queue' if not fiber.self().storage.console then require'console'.start() os.exit() end ``` :::

    Import from clipboard

    Paste your markdown or webpage here...

    Advanced permission required

    Your current role can only read. Ask the system administrator to acquire write and comment permission.

    This team is disabled

    Sorry, this team is disabled. You can't edit this note.

    This note is locked

    Sorry, only owner can edit this note.

    Reach the limit

    Sorry, you've reached the max length this note can be.
    Please reduce the content or divide it to more notes, thank you!

    Import from Gist

    Import from Snippet

    or

    Export to Snippet

    Are you sure?

    Do you really want to delete this note?
    All users will lose their connection.

    Create a note from template

    Create a note from template

    Oops...
    This template has been removed or transferred.
    Upgrade
    All
    • All
    • Team
    No template.

    Create a template

    Upgrade

    Delete template

    Do you really want to delete this template?
    Turn this template into a regular note and keep its content, versions, and comments.

    This page need refresh

    You have an incompatible client version.
    Refresh to update.
    New version available!
    See releases notes here
    Refresh to enjoy new features.
    Your user state has changed.
    Refresh to load new user state.

    Sign in

    Forgot password

    or

    By clicking below, you agree to our terms of service.

    Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
    Wallet ( )
    Connect another wallet

    New to HackMD? Sign up

    Help

    • English
    • 中文
    • Français
    • Deutsch
    • 日本語
    • Español
    • Català
    • Ελληνικά
    • Português
    • italiano
    • Türkçe
    • Русский
    • Nederlands
    • hrvatski jezik
    • język polski
    • Українська
    • हिन्दी
    • svenska
    • Esperanto
    • dansk

    Documents

    Help & Tutorial

    How to use Book mode

    Slide Example

    API Docs

    Edit in VSCode

    Install browser extension

    Contacts

    Feedback

    Discord

    Send us email

    Resources

    Releases

    Pricing

    Blog

    Policy

    Terms

    Privacy

    Cheatsheet

    Syntax Example Reference
    # Header Header 基本排版
    - Unordered List
    • Unordered List
    1. Ordered List
    1. Ordered List
    - [ ] Todo List
    • Todo List
    > Blockquote
    Blockquote
    **Bold font** Bold font
    *Italics font* Italics font
    ~~Strikethrough~~ Strikethrough
    19^th^ 19th
    H~2~O H2O
    ++Inserted text++ Inserted text
    ==Marked text== Marked text
    [link text](https:// "title") Link
    ![image alt](https:// "title") Image
    `Code` Code 在筆記中貼入程式碼
    ```javascript
    var i = 0;
    ```
    var i = 0;
    :smile: :smile: Emoji list
    {%youtube youtube_id %} Externals
    $L^aT_eX$ LaTeX
    :::info
    This is a alert area.
    :::

    This is a alert area.

    Versions and GitHub Sync
    Get Full History Access

    • Edit version name
    • Delete

    revision author avatar     named on  

    More Less

    Note content is identical to the latest version.
    Compare
      Choose a version
      No search result
      Version not found
    Sign in to link this note to GitHub
    Learn more
    This note is not linked with GitHub
     

    Feedback

    Submission failed, please try again

    Thanks for your support.

    On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

    Please give us some advice and help us improve HackMD.

     

    Thanks for your feedback

    Remove version name

    Do you want to remove this version name and description?

    Transfer ownership

    Transfer to
      Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

        Link with GitHub

        Please authorize HackMD on GitHub
        • Please sign in to GitHub and install the HackMD app on your GitHub repo.
        • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
        Learn more  Sign in to GitHub

        Push the note to GitHub Push to GitHub Pull a file from GitHub

          Authorize again
         

        Choose which file to push to

        Select repo
        Refresh Authorize more repos
        Select branch
        Select file
        Select branch
        Choose version(s) to push
        • Save a new version and push
        • Choose from existing versions
        Include title and tags
        Available push count

        Pull from GitHub

         
        File from GitHub
        File from HackMD

        GitHub Link Settings

        File linked

        Linked by
        File path
        Last synced branch
        Available push count

        Danger Zone

        Unlink
        You will no longer receive notification when GitHub file changes after unlink.

        Syncing

        Push failed

        Push successfully