Elixir

Support for Elixir is provided through fluvio-ex, a community supported project by @viniarck.

 

Create a topic

This will create a topic lobby with 1 partition and 1 replica.

alias Fluvio.Admin

{:ok, pid} = Admin.start_link()
{:ok, _} = Admin.create_topic(pid, "lobby", %{partitions: 1, replication: 1})

Link to original source

 

Producer

In this example, a Producer for the topic lobby is created. Then the message hello is sent to the topic. Also, twenty values (1 to 20) are sent asynchronously in chunks of 10.

alias Fluvio.Producer

{:ok, pid} = Producer.start_link(%{topic: "lobby"})

{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)

[] =
  1..20
  |> Stream.chunk_every(10)
  |> Stream.flat_map(fn chunk ->
    [
      chunk
      |> Enum.map(fn value ->
        Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
      end)
      |> Task.await_many()
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
      [{Producer.flush(pid), :flush}]
      |> Enum.filter(&match?({{:error, _msg}, _value}, &1))
    ]
  end)
  |> Stream.concat()
  |> Enum.to_list()

Link to original source

 

Consumer

In this example, a Consumer for the topic lobby is created, starting from offset 0. As records are received, the record contents are printed with IO.inspect.

alias Fluvio.Consumer

{:ok, pid} = Consumer.start_link(%{topic: "lobby", offset: [from_beginning: 0]})

Consumer.stream_each(pid, fn result ->
  case result do
    {:ok, record} -> IO.inspect(record)
    {:error, msg} -> IO.inspect("Error: #{msg}")
  end
end)

Link to original source