Categoria: MLOps

  • Orquestrando scripts e agendando-os com Prefect: introdução ao MLOps

    Orquestrando scripts e agendando-os com Prefect: introdução ao MLOps

    Tabela de conteudos


    O que é um orquestrador?

    Basicamente, ele coordena múltiplas aplicações, juntando as tarefas e definindo sua ordem de execução. No Python, temos alguns, como o Dagster, o Airflow e o utilizado aqui, que será o Prefect, que ganhou uma versão 3.0.

    Mas por que usar um orquestrador?

    Um orquestrador permite diversas coisas, entre elas, agendar a execução do código. Isso é muito útil para a área de dados. Imagine que você recebe dados diariamente e, com isso, precisa tratá-los toda vez. Além disso, os modelos podem ficar desatualizados por motivos de Data Drift, que é uma alteração da distribuição dos dados, fazendo com que nossos modelos fiquem descalibrados para a nova previsão. Com o orquestrador, podemos deixar o código agendado em alguma máquina para que execute os scripts e adicione os dados à nossa base de dados. Além disso, podemos ter uma regularidade sobre quando retreinarmos os modelos, com uma função que verifique se o modelo está perdendo precisão, acurácia, etc.

    Exemplo de uma tarefa agendada

    Link do repositório

    Para rodar o código acima, basta ter o Docker e o Docker Compose instalados e executar os seguintes comandos:

    git clone https://github.com/mnsgrosa/prefect_mldump.git
    cd prefect_mldump
    docker compose up --build -d
    

    Vamos à parte que interessa: o que estamos orquestrando?

    Neste exemplo, estamos orquestrando o scraping de um subreddit (que pode ser alterado no script) e a adição ao banco de dados que, por motivos de simplicidade, fiz apenas com um CSV e uma API rápida no FastAPI, que será mostrada logo em seguida.

    /prefect_mldump/main/orchestrator/api_tool.py
    

    api_caller

    Ele utiliza a biblioteca httpx para fazer a requisição e recebe um JSON (ou um dicionário, para nós, usuários de Python). E basicamente é isso. Essa ferramenta é o retorno da classe responsável por extrair esse JSON, que será orquestrada para fazer o scraping a cada 2 horas.

    navegar

    Agora, a API é estruturada da seguinte maneira:

    Imports que serão relevantes:

    • fastapi: Biblioteca responsável pela criação do que chamamos de endpoints, que são os responsáveis por realizar a operação no banco de dados. Um material que indico muito é o curso FastAPI do Dunossauro.
    • typing: Tipagem dos objetos para que não haja inconsistência no FastAPI. O ideal seria utilizar o Pydantic, mas por motivos de velocidade, não o utilizei.
    • uvicorn: Lib responsável por fazer a API rodar.

    imports

    E agora, a criação dos endpoints:

    • Get Método que deve retornar objetos da base de dados. Neste caso, ele não recebe nada e retorna a base de dados inteira, mas poderia simplesmente retornar um registro do banco de dados ou algo específico que o usuário desejar. get

    • Post Método para adicionar informações à base de dados. Neste caso, existe um mini tratamento que fiz, mas poderia ser ainda mais simples, transformando os dados em uma lista de listas, sendo menos verboso.

    post

    Agora que foi explicada essa parte de API, podemos ir à parte da orquestração.

    Como o Prefect funciona

    Este orquestrador funciona com o que chamam de flows (fluxos). Cada fluxo tem tasks (tarefas). Essas tarefas são os “átomos” do que fazer, e os fluxos têm um limite do que pode ser realizado. Assim, o ideal é deixar o código o mais modularizável possível, por motivos de reutilização e para manter a organização proposta pelo Prefect. Outra maneira de ajustar isso seria a criação de um fluxo de fluxos, onde um fluxo principal pode receber múltiplos subfluxos para minimizar o impacto em cada um. Dito isso, o conceito que vou demonstrar envolve as tarefas e os flows que serão orquestrados para agendamento. Abaixo, um exemplo seguindo os padrões sugeridos pela própria Prefect:

    flow_modularizado

    E agora, um exemplo menos indicado:

    flow_nao_modularizado

    Repare nos decoradores (@task, @flow) acima das funções. Eles dão métodos adicionais à função que nos serão úteis para o deploy (implantação) do agendamento dos flows. Por fim, temos o último flow a ser criado: o de postar os dados obtidos da internet em nossa base de dados.

    post_flow

    Repare que agora a tarefa final retorna um objeto diferente: ele se chama State (estado) e ajuda a gerenciar o seguimento dos fluxos. Caso em algum momento um dos fluxos falhe, podemos alterar o que o orquestrador executará. Seguindo as boas práticas sugeridas, faremos então um fluxo de fluxos, já que um depende da saída do outro, ficando da seguinte maneira o fluxo final:

    full flow

    Antes de executarmos o deploy desse flow, precisamos criar o que chamam de work pool.

    Work Pool

    É o responsável por ligar a camada de orquestração à camada de infraestrutura, fazendo assim os flows serem executados na infra desejada. Neste caso, será na minha máquina, mas poderia ser implementado na AWS, GCP ou outra nuvem disponível. Link da documentação completa.

    Dito isso, como criamos o work pool? Pode ser feito de duas maneiras principais: executando o seguinte comando:

    prefect work-pool create [opcao] [nome_do_work_pool]
    

    E a maneira que fiz para o teste com o Docker Compose, com um script Python:

    work_pool

    Aqui, para criá-lo no CLI, utilizaríamos o seguinte comando:

    prefect work-pool create --type=docker my-docker-pool
    

    A flag --type=docker é necessária, pois o serviço que será utilizado é o Docker. Com o work pool criado, podemos “deployar” o agendamento e, após isso, acionamos o work pool.

    Deployando o fluxo de fluxos

    Agora, os métodos adicionais serão utilizados:

    deploy

    A função full_flow recebeu o método from_source(), que permite construir o flow a partir do arquivo .py original. Por fim, temos o .deploy(), que gera as principais informações para o deploy, sendo elas: nome do deploy, nome do work pool e o agendador (com sintaxe cron, que irei explicar no final deste post, mas, neste caso, ele passa a seguinte informação: “a cada 2 horas, no minuto 0”).

    Basta executar o script para criar o deploy no work pool fornecido. Agora, falta inicializar o work pool para agendar a execução do código.

    Inicializando o work pool

    Para que o work pool seja inicializado, basta executar o seguinte código no CLI:

    prefect worker start -p [nome_da_pool] [opcoes]
    

    No nosso caso, se traduz para:

    prefect worker start -p "my-docker-pool" --type "docker"
    

    Essa última opção (--type "docker") se deve ao fato de estarmos executando em um contêiner Docker. Caso estivesse rodando fora, na sua máquina, não precisaria dessa opção final. E pronto! O processo está agendado para executar periodicamente de acordo com o cron. Mas, no caso do projeto, o Docker Compose já faz tudo por você, então aqui vai como foi feito:

    Estruturação do projeto

    Fiz o projeto com o Docker Compose para que fosse mais fácil rodar os serviços de backend e do Prefect sem ter de abrir múltiplos terminais. Criei uma imagem customizada simples, sem muitas técnicas para deixá-la mais leve (como o uso de distroless), e nos serviços restantes utilizei a imagem oficial do Prefect com Python 3.12.

    Minha imagem customizada ficou da seguinte maneira:

    dockerfile

    E a invoco em dois serviços: o responsável pelo backend e o responsável por criar o work pool.

    Primeiro serviço: Backend

    Ele roda a imagem customizada que criei para executar o backend com uv.

    backend

    Segundo serviço: Flow Server

    Roda a imagem oficial do Prefect 3.12 e executa o comando para inicializar o servidor, onde teremos o dashboard para acompanharmos o andamento dos flows.

    flow_server

    Terceiro serviço: Work Pool Creator

    Também rodando a imagem que criei para criar o worker, mesmo sabendo que poderia ser executado via CLI com a imagem do Docker.

    work_pool_creator

    Quarto serviço: Flow Scheduler

    Utilizando a imagem customizada, roda os scripts para “deployar” os serviços após a criação do work pool especificado no script.

    flow_scheduler_yaml

    E, por fim, o último serviço: Flow Start

    Está utilizando a imagem customizada, pois a imagem base do Prefect não vem com o módulo do Docker, o que acaba tornando necessário o uso de:

    uv add "prefect[docker]"
    

    para adicioná-lo ao pyproject.toml.

    flow_start

    Visualizar o resultado

    Agora, basta acessar os seguintes links para ver o resultado: o backend aqui e o dashboard do orquestramento aqui.

    Considerações sobre o projeto

    Há muita margem para melhoria nesse projeto. A intenção era simplesmente mostrar o básico. É possível deixar os flows assíncronos, limitar a concorrência de processamento, etc. O meu projeto inicial utilizaria tudo isso, mas a API da qual eu estava pegando os dados climáticos foi desativada (ate o momento), o que me impediu de seguir com o projeto, apesar de ele já ter uma estrutura mais avançada.

    Explicando Cron

    Cron e um agendador de tarefas em sistemas operacionais unix e ele segue a seguinte semantica (minuto, hora, dia, mes, dia da semana) ou seja (* * * * *) siginifica todo minuto o asterisco representa sempre e */n acada n unidades sendo assim meu cron 0 */2 * * * representa: a cada duas horas

  • Meu agente MCP arxiv

    Meu agente MCP arxiv

    A ideia inicial

    Minha ideia inicial foi criar um agente com ferramentas para pegar títulos, introduções e GitHub do Papers with Code, que agora virou Trending Papers. Enquanto estava programando a transição para o ArXiv, lançaram o Trending Papers, então é uma possibilidade fazer para ele também. A ideia teve um ponto de partida que foi a criação de um grupo de papers da minha faculdade — achei que seria interessante me propor o desafio de criar um agente conversacional com ferramentas de scraping e base vetorial para busca pelo nome do tópico. O plano é aprimorar a ferramenta futuramente para acessar ferramentas internas, como deadlines de conferências ou eventos da faculdade.

    Fluxo do agente

    O fluxo funciona da seguinte maneira: o agente se conecta ao servidor MCP, que está ligado na porta 8000, e solicita as ferramentas ao servidor. Em seguida, ele passa a interpretar os prompts do usuário e, caso uma ferramenta seja solicitada, ele a executa e retorna uma resposta estruturada ao usuário.

    flow

    Como iniciar o agente

    Primeiro pegue a key da API Groq nesse link GROQ

    Basta clonar o repositorio

    git clone https://github.com/mnsgrosa/llm_arxiv.git
    

    Instalar a ferramenta docker e docker compose e em seguida entrar no diretorio e jogar o seguinte comando no terminal

    crie um .env contendo a seguinte linha

    GROQ_API_KEY = sua_key_aqui
    

    e rodar o comando abaixo

    docker compose up -d
    

    O dashboard fica localizado aqui Dashboard

    Link para o git do projeto

    Link para o git

    print

    Ferramentas utilizadas

    FastMCP, Httpx, BeautifulSoup4, Chromadb, Gradio, Langchain, LangGraph, Langchain_groq


    Leitura do site

    Para as requisições do ArXiv, utilizei o Httpx — uma ferramenta mais recente que a clássica requests do Python. Escolhi essa ferramenta por ter mais familiaridade, por indicação de colegas, e por sentir que é mais rápida. Para leitura da resposta do site, utilizei o BeautifulSoup4.

    Criação do agente

    Para criar o MCP, utilizei a biblioteca FastMCP e, para o agente em si, usei o Langchain e o Langchain_groq para ligar à API gratuita do modelo Llama3-8b-8192.


    Ideias por trás do código e o código

    • Como foi feito o scraping:
      O ArXiv tem uma página destinada a requisições por tópicos. Dito isso, é de fácil acesso com o Httpx, e as tags HTML também facilitam o processo por serem 'title', 'summary' e 'link', bastando chamar o método find_all() da biblioteca BeautifulSoup4. Então é uma questão de lógica de programação simples para salvar os itens.

    scraper.py

    • Criação das ferramentas do agente:
      Criei um arquivo chamado shared_paper_tools.py. Nesse arquivo, há uma classe que controla o que o agente irá fazer. Ao ser inicializada, ela cria 3 bases de dados utilizando o Chromadb. A principal, que é a de tópicos, é fundamental para localizar os itens nas outras bases, que são as que salvam os títulos e introduções de cada paper.
    1. scrape_arxiv_papers:
      Temos um método simples que usa o scraper e salva os itens na base de dados.
      scrap_func

    2. search_stored_papers:
      Esse método faz a LLM interpretar qual o tópico desejado pelo usuário e depois verifica se há algum tópico parecido no banco de dados.
      search_func

    3. get_or_scrape_papers:
      É um método mais versátil: se não encontrar na base de dados, ele irá chamar os dois métodos acima; se existir, só o método de pegar o paper.
      get_or_scrape

    4. list_available_topics:
      Verifica quais tópicos estão na base de dados vetorial.
      list_topic

    • MCP server:
      Aqui há um ponto crucial: cada código deve ter uma docstring bem feita para o agente. Ele vai se orientar pela docstring sobre o que fazer. Na docstring, deve-se explicar os parâmetros da função e uma descrição do que ela faz para, quando for pedida uma tarefa, o agente entender que é aquela ferramenta específica que você deseja. De resto, basta importar o objeto criado das ferramentas e chamá-los em cada função. Segue um exemplo de função feita — perceba o annotator acima dela.

    mcp_server

    • Agente:
      Algumas partes importantes serão citadas aqui, mas muitas partes serão puladas por ser um script muito grande. Primeiramente, os imports:
      imports
      Aqui, gostaria de falar sobre alguns imports importantes para criar o MCP, sendo eles o AgentExecutor e create_tool_calling_agent da linha 7 e load_mcp_tools e create_react_agent das linhas 13 e 14. Essas são as classes responsáveis por transformar a LLM em um agente. Como mostrado nos métodos abaixo:

      agent_creation

      Aqui temos a inicialização do client sse, que é o método de conexão com o servidor MCP. Outra parte importante é o prompt base do agente: ele é quem vai guiar a LLM para tomar decisões. Em seguida, temos a criação do agente com create_tool_calling_agent, dando wrapping em AgentExecutor, que permite a execução das ferramentas. E, por fim, a função que permite a conversação com o agente, salvando as mensagens na memória.

      chat

      Por fim, não é necessário fazer um loop devido ao fato que o streamlit naturalmente esta loopando e com isso zerando o que nao for posto no cache do codigo com as seguintes linhas:

      app


    Próximos passos

    Os próximos passos são criar algumas ferramentas adicionais que seriam úteis para minha faculdade (CIN/UFPE).