# Tightbeam > a focused beam of energy (such as a laser) that contains and transmits information --- # What is an RTCDP anyway? --- ## RTCDP RTCDP (Real Time Customer Data Plaform) wants to be the one stop shop to show users the sources and destinations for all their org's data, and how it all flows in real time. ![inline](https://i.imgur.com/03LHqts.png) --- ## Alloy Alloy instuments all Adobe tags in a single tag. Users send XDM data to the edge and Konductor sends it to all the Adobe places. Alloy makes use of a Blackbird Config to store these settings that can be changed independent of launch publishing. --- ## Blackbird Blackbird is the Config service. It stores configuration that determines where the alloy hits are sent. When configs are altered it notifies Konductor via Hollow and other listeners via Pipeline. --- ## Konductor Konductor uses the Blackbird config to route data sent to a given `config_id` to the appropriate Adobe services. --- ## How does Launch fit in to the picture? Launch has integrated with Flow service to represent how data flows from a Launch property into a Platform data set. When the Alloy extension is installed on a property, Launch creates several records in Platform: - A Source (representing a Launch property with Alloy installed) - A Destination (representing a Platform dataset configured in Blackbird) - A Flow that ties them together for Platform users to visualize. --- ## What does this look like in Platform? The platform UI uses these records to visualize all of the data flows. ![](https://i.imgur.com/ahk9vuI.png) > [name=is this the right place for this data?] --- ## What user actions cause Launch to update Flow? - Install Alloy supplying a Blackbird config_id - Edit Alloy's Blackbird `config_id` - Edit Blackbird Config directly in Launch without updating the extension. --- # The Implementation ## High Level - Connect to Flow API to create flow records - Get configurations from Blackbird - Get configuration updates from Blackbird --- ## Components - Relay: Forwards config updates from Blackbird to Launch - Blacksmith: Consumes config updates, Alloy extension installs, and creates Flow Records --- ## Flow-verview ``` - connectionSpecs # exists in the Flow Repository - baseConnections # created once - sourceConnections # flow_source_connections - targetConnections # flow_target_connections - flows # flow_flows ``` --- ## Relay Up until now Relay has been used only as our callback delivery service We added the ability to consume from Pipeline --- ![](https://miro.medium.com/max/2000/1*1JUfnJLEtIXHB_ygoy5uKg.png) --- ### What is Elixir? - Built on Erlang - Functional paradigm - Open Telecom Protocol (OTP) - Often thought of as a "process oriented language" --- #### What do we like about Elixir? - Gen servers - managed children, restarts, (let the managed child die and restart) - OTP is designed for fault tolerance and long lived connections - Ease of scaling - processes are extremely light weight - spin up millions of worker processes to process in parallel --- ### How it works? - Spin up a `Relay.Pipeline.Consumer` process which: - Connects to Pipeline - Receives messages from Pipeline asynchronously - Takes action based on those messages - Ah, ha, ha, ha, stayin' alive --- ### Spinning up a Process ```elixir worker( Relay.Pipeline.Consumer, [ %{ topic: "dx_exeg_config_updates", group: "activation-relay", handler: Relay.Pipeline.BlackbirdHandler } ] ) ``` --- ### Connect to Pipeline ```elixir def handle_info(:connect, state) do %{topic: topic, group: group, topic_url: topic_url} = state headers = ["Accept": "application/json"] options = stream_to: self(), recv_timeout: 90000 case HTTPoison.get(topic_url, headers, options) do {:ok, %HTTPoison.AsyncResponse{}} -> # Connected! {:error, %HTTPoison.Error{reason: reason}} -> # Attempt to reconnect Process.send_after(self(), :connect, @reconnect_interval) end {:noreply, state} end ``` --- ### Receiving messages from Pipeline --- #### DATA > Pipeline sends these containing payloads to be processed ```elixir def process_envelope(%{"envelopeType" => "DATA", "pipelineMessage" => message }, %{handler: handler}) do # Delegate message processing to the handler case handler.process_message(message) do :ok -> # Success! {:error, reason} -> exception = %RuntimeError{message: "Error message!"} reconnect!(exception, false) end end ``` --- #### PING > Pipeline sends these every 30 seconds to verify connection is alive ```elixir def process_envelope(%{ "envelopeType" => "PING" }, _state) do # Just log, because we don't need to do anything end ``` --- #### SYNC > Pipeline sends these to allow a consumer to mark its place in the stream to save progress ```elixir def process_envelope(%{ "envelopeType" => "SYNC", "syncMarker" => marker }, %{sync_url: sync_url}) do # Echo the sync message back to pipeline response = HTTPoison.post(sync_url, marker) handle_sync_response(response) end ``` --- ### Ah, ha, ha, ha, stayin' alive - Elixir _excels_ at failure - If `something goes wrong`, - Pipeline crashes - We can't connect to RabbitMQ - Then, - Raise an exception, and crash! - Relay will reconnect after an appropriate reconnect interval and continue where it left off --- ## Blacksmith --- ### How it works? - FlowEventConsumer - Is triggered when Alloy is installed or updated - BlackbirdEventConsumer - Responds to messages forwarded from Blackbird by Relay --- ### FlowEventConsumer An extension has changed (for now just the Alloy extension). This triggers a series of steps: - Look up the config_id in Blackbird - Compare it with the locally cached flow state - Update Flow as needed --- ### BlackbirdEventConsumer A Blackbird configuration has changed. Since we _may_ be using this config, we trigger a series of steps: - Look up the config_id in Blackbird - Find all extensions using that config_id - For each extension, - Compare it with the locally cached flow state - Update Flow as needed --- ## Questions? ---
{"metaMigratedAt":"2023-06-15T08:23:36.465Z","metaMigratedFrom":"Content","title":"Tightbeam","breaks":true,"contributors":"[{\"id\":\"2d018edc-6bad-4d8d-9fe9-5c2922bda892\",\"add\":16610,\"del\":10113},{\"id\":null,\"add\":1800,\"del\":2092}]"}
    293 views