fr

Mistral Batch API : traitement asynchrone à moitié prix

Guide complet et sourcé du Batch API Mistral : flux de bout en bout, format JSONL, gestion des erreurs, inline vs file-based, endpoints supportés et pièges connus.

Le Batch API de Mistral permet d’envoyer des milliers — voire des millions — de requêtes de façon asynchrone, avec une réduction de 50 % sur le coût des tokens par rapport à l’API synchrone. Le principe : on dépose un fichier de requêtes, on attend que le job se termine, on récupère les résultats. Aucune réponse en temps réel, mais un débit bien supérieur et un coût deux fois moindre.

Lancé en novembre 2024, le Batch API est disponible pour tous les utilisateurs de La Plateforme — plan gratuit inclus, avec les limitations de tiers habituelles.

Source : Mistral Batch API announcement


1. Quand utiliser le Batch API

Le Batch API est le bon choix dès que la réponse immédiate n’est pas nécessaire :

Cas d’usagePourquoi le batch
Résumé ou traduction de corpus documentaireÉconomie de 50 %, pas d’utilisateur qui attend
Génération d’embeddings pour un index de rechercheCorpus entier traité en une nuit
Pipeline d’annotation / data labelingDataset fixe, résultats consommés en agrégat
Analyse de sentiment sur exports CRMTraitement périodique (nightly, weekly)
OCR de factures ou reçus en masseTraitement fin de journée
LLM-as-a-judge sur un eval setDes centaines de cas de test en une passe
Génération de données synthétiquesDonnées d’entraînement générées en bulk
Transcription audio de réunions enregistréesPost-traitement, pas de contrainte temps réel

À l’inverse, l’API synchrone reste obligatoire pour tout ce qui est interactif : chatbot, complétion de code, streaming, agents multi-tour.


2. Architecture et flux

Le cycle de vie d’un job batch se décompose en quatre étapes :

┌──────────────────┐
│ 1. Upload JSONL  │  POST /v1/files
│    (optionnel)   │
└────────┬─────────┘
         │  file_id

┌──────────────────┐
│ 2. Créer le job  │  POST /v1/batch/jobs
└────────┬─────────┘
         │  job_id

┌──────────────────┐
│ 3. Poller le     │  GET /v1/batch/jobs/{job_id}
│    statut        │
└────────┬─────────┘
         │  output_file_id

┌──────────────────┐
│ 4. Télécharger   │  GET /v1/files/{file_id}/content
│    les résultats │
└──────────────────┘

L’étape 1 est optionnelle pour les petits jobs (< 10 000 requêtes) : on peut envoyer les requêtes directement dans le corps du job (inline batch).

Source : Mistral Batch Inference Docs


3. Format JSONL d’entrée

Chaque ligne du fichier .jsonl est un objet JSON autonome avec deux champs :

{"custom_id": "req-001", "body": {"max_tokens": 200, "messages": [{"role": "user", "content": "Quelle est la capitale de la France ?"}]}}
{"custom_id": "req-002", "body": {"max_tokens": 200, "messages": [{"role": "user", "content": "Explique le théorème de Pythagore en une phrase."}]}}
ChampTypeRequisDescription
custom_idstringOuiIdentifiant unique défini par l’appelant. Sert à corréler input et output.
bodyobjectOuiLe corps de la requête, exactement comme on l’enverrait à l’endpoint synchrone.

Le contenu de body varie selon l’endpoint cible. Pour /v1/embeddings :

{
  "custom_id": "doc-42",
  "body": { "input": "Texte à encoder.", "encoding_format": "float" }
}

Pour /v1/ocr :

{
  "custom_id": "facture-001",
  "body": {
    "document": {
      "type": "image_url",
      "image_url": "data:image/jpeg;base64,/9j/..."
    },
    "include_image_base64": true
  }
}

Contraintes :

  • Taille maximale du fichier : 512 Mo
  • Maximum 1 000 000 requêtes par job en mode fichier
  • Un seul modèle par job (pas de mix de modèles)

4. Étape 1 — Upload du fichier

from mistralai import Mistral

client = Mistral(api_key="YOUR_API_KEY")

with open("batch_input.jsonl", "rb") as f:
    batch_file = client.files.upload(
        file={"file_name": "batch_input.jsonl", "content": f},
        purpose="batch"  # obligatoire : "batch", "fine-tune" ou "ocr"
    )

print(batch_file.id)       # "file-abc123"
print(batch_file.num_lines) # nombre de requêtes dans le fichier

Le champ purpose est obligatoire et doit valoir "batch" pour un job batch. Le fichier uploadé obtient un sample_type de "batch_request" en interne.

Source : Files API Reference


5. Étape 2 — Création du job

Mode fichier

job = client.batch.jobs.create(
    input_files=[batch_file.id],          # liste d'IDs de fichiers
    model="mistral-small-latest",         # un seul modèle par job
    endpoint="/v1/chat/completions",
    metadata={"project": "extraction-factures", "version": "1.0"},
    timeout_hours=24                       # défaut : 24, max : 168 (7 jours)
)

print(job.id)      # "batch-xyz789"
print(job.status)  # "QUEUED"

Mode inline (sans upload)

Pour moins de 10 000 requêtes, pas besoin de fichier :

job = client.batch.jobs.create(
    requests=[
        {
            "custom_id": "0",
            "body": {
                "max_tokens": 128,
                "messages": [{"role": "user", "content": "Meilleur fromage français ?"}]
            }
        },
        {
            "custom_id": "1",
            "body": {
                "max_tokens": 128,
                "messages": [{"role": "user", "content": "Capitale du Japon ?"}]
            }
        }
    ],
    model="mistral-small-latest",
    endpoint="/v1/chat/completions"
)

Comparaison inline vs file-based

DimensionInlineFile-based
Max requêtes< 10 000jusqu’à 1 000 000
Upload requisNonOui (purpose="batch")
Paramètre d’entréerequests=[...]input_files=[file_id]
Plusieurs fichiersNonOui
Idéal pourPrototypage, petits jobsProduction à grande échelle

6. Étape 3 — Polling du statut

import time

terminal_statuses = {"SUCCESS", "FAILED", "TIMEOUT_EXCEEDED", "CANCELLED"}

while job.status not in terminal_statuses:
    time.sleep(5)
    job = client.batch.jobs.get(job_id=job.id)
    print(
        f"{job.status} | "
        f"{job.succeeded_requests + job.failed_requests}"
        f"/{job.total_requests}"
    )

Machine d’états

QUEUED → RUNNING → SUCCESS
                 → FAILED
                 → TIMEOUT_EXCEEDED

QUEUED / RUNNING → CANCELLATION_REQUESTED → CANCELLED
StatutSignification
QUEUEDJob accepté, en attente d’un slot de traitement
RUNNINGRequêtes en cours de traitement
SUCCESSTraitement terminé, fichier de résultats disponible
FAILEDÉchec au niveau du job (pas des requêtes individuelles)
TIMEOUT_EXCEEDEDDépassement du timeout_hours
CANCELLATION_REQUESTEDAnnulation demandée, en cours de fin
CANCELLEDAnnulé

Un job peut atteindre SUCCESS même si certaines requêtes individuelles ont échoué — succeeded_requests + failed_requests peut être inférieur à total_requests si le job a expiré.


7. Étape 4 — Récupération des résultats

if job.status == "SUCCESS":
    output_bytes = client.files.download(file_id=job.output_file).read()
    output_text = output_bytes.decode("utf-8")

    for line in output_text.strip().split("\n"):
        result = json.loads(line)
        cid = result["custom_id"]

        if result.get("error"):
            print(f"[{cid}] ERREUR : {result['error']}")
            continue

        content = result["response"]["body"]["choices"][0]["message"]["content"]
        print(f"[{cid}] {content[:100]}")

Format de sortie JSONL

Chaque ligne correspond à une requête d’entrée, corrélée via custom_id :

{
  "id": "cmpl-abc123",
  "custom_id": "req-001",
  "response": {
    "status_code": 200,
    "body": {
      "id": "cmpl-abc123",
      "object": "chat.completion",
      "model": "mistral-small-latest",
      "choices": [
        {
          "index": 0,
          "message": {
            "role": "assistant",
            "content": "Paris est la capitale de la France."
          },
          "finish_reason": "stop"
        }
      ],
      "usage": {
        "prompt_tokens": 18,
        "completion_tokens": 12,
        "total_tokens": 30
      }
    }
  },
  "error": null
}

L’ordre des lignes dans le fichier de sortie ne correspond pas forcément à l’ordre du fichier d’entrée. Toujours se baser sur custom_id pour reconstituer la correspondance.


8. Gestion des erreurs

Il y a deux niveaux d’erreur distincts :

Erreurs au niveau du job : le champ status passe à FAILED et le tableau errors de l’objet job contient les détails.

Erreurs au niveau des requêtes individuelles : le job peut tout de même atteindre SUCCESS avec un error_file. Le failed_requests indique combien ont échoué.

if job.status == "SUCCESS" and job.failed_requests > 0 and job.error_file:
    err_bytes = client.files.download(file_id=job.error_file).read()
    for line in err_bytes.decode("utf-8").strip().split("\n"):
        err = json.loads(line)
        print(f"Échec [{err['custom_id']}] : {err.get('error')}")
        # err["response"]["status_code"] contient le code HTTP (4xx ou 5xx)

Le fichier d’erreur a la même structure JSONL que le fichier de sortie, avec le champ error renseigné et response.status_code à 4xx ou 5xx.


9. Exemple complet de bout en bout

import json
import time
from mistralai import Mistral

client = Mistral(api_key="YOUR_API_KEY")

# --- Préparation du fichier JSONL ---
questions = [
    "Quelle est la capitale de l'Allemagne ?",
    "Explique la relativité restreinte en deux phrases.",
    "Donne trois exemples de frameworks Python.",
]

requests_data = [
    {
        "custom_id": str(i),
        "body": {
            "max_tokens": 128,
            "messages": [{"role": "user", "content": q}]
        }
    }
    for i, q in enumerate(questions)
]

jsonl_bytes = b"\n".join(json.dumps(r).encode() for r in requests_data)

# --- Upload ---
batch_file = client.files.upload(
    file={"file_name": "questions.jsonl", "content": jsonl_bytes},
    purpose="batch"
)
print(f"Fichier uploadé : {batch_file.id}")

# --- Création du job ---
job = client.batch.jobs.create(
    input_files=[batch_file.id],
    model="mistral-small-latest",
    endpoint="/v1/chat/completions",
    metadata={"projet": "demo"},
    timeout_hours=24
)
print(f"Job créé : {job.id}")

# --- Polling ---
terminal_statuses = {"SUCCESS", "FAILED", "TIMEOUT_EXCEEDED", "CANCELLED"}

while job.status not in terminal_statuses:
    time.sleep(5)
    job = client.batch.jobs.get(job_id=job.id)
    print(f"{job.status} | {job.succeeded_requests}/{job.total_requests}")

# --- Résultats ---
if job.status == "SUCCESS":
    output = client.files.download(file_id=job.output_file).read().decode("utf-8")

    for line in output.strip().split("\n"):
        result = json.loads(line)
        if result.get("error"):
            print(f"[{result['custom_id']}] ERREUR : {result['error']}")
        else:
            content = result["response"]["body"]["choices"][0]["message"]["content"]
            print(f"[{result['custom_id']}] {content}")

    if job.failed_requests > 0 and job.error_file:
        err_output = client.files.download(file_id=job.error_file).read().decode("utf-8")
        for line in err_output.strip().split("\n"):
            err = json.loads(line)
            print(f"ÉCHEC [{err['custom_id']}] : {err.get('error')}")

elif job.status == "FAILED":
    print(f"Job échoué : {job.errors}")
elif job.status == "TIMEOUT_EXCEEDED":
    print(f"Timeout dépassé après {job.completed_requests} requêtes complétées.")

10. Endpoints supportés

EndpointUsage
/v1/chat/completionsGénération de texte, vision multimodale
/v1/embeddingsEncodage vectoriel
/v1/fim/completionsComplétion de code fill-in-the-middle (Codestral)
/v1/moderationsClassification de contenu
/v1/chat/moderationsModération de conversations
/v1/ocrReconnaissance optique de caractères
/v1/classificationsClassification custom
/v1/chat/classificationsClassification conversationnelle
/v1/conversationsGestion de conversations
/v1/audio/transcriptionsTranscription audio (Voxtral)

Le modèle utilisé doit être compatible avec l’endpoint choisi — on ne peut pas envoyer des requêtes /v1/audio/transcriptions avec mistral-small-latest.

Source : Mistral Batch Capabilities Docs


11. Tarification et limites

Tarification

Le Batch API applique une réduction de 50 % sur le prix des tokens par rapport à l’API synchrone pour tous les modèles. Le prix est publié sur mistral.ai/pricing — la colonne batch vaut toujours la moitié du tarif synchrone.

Le Batch API n’est disponible que via La Plateforme (api.mistral.ai). Les déploiements cloud partenaires (AWS, GCP, Azure) ne le supportent pas.

Limites opérationnelles

ParamètreValeur
Réduction tarifaire50 % vs API synchrone
Max requêtes / job (fichier)1 000 000
Max requêtes / job (inline)< 10 000
Taille max du fichier d’entrée512 Mo
Timeout par défaut24 heures
Timeout maximum (timeout_hours)168 heures (7 jours)
Requêtes en cours simultanées / workspace1 000 000
Rétention des fichiers résultatsPas d’expiration automatique
Expiration des signed URLs24 heures (configurable)

Les limites batch sont indépendantes des rate limits de l’API synchrone (RPS, tokens/minute). Les deux pools ne partagent pas de quota.

Source : Rate Limits & Usage Tiers


12. Pièges et limitations connues

Les résultats ne sont pas dans l’ordre d’entrée

Le fichier de sortie JSONL ne préserve pas l’ordre du fichier d’entrée. Toujours reconstruire la correspondance via custom_id, jamais par position de ligne.

# Anti-pattern
results = [json.loads(l) for l in output.strip().split("\n")]
# results[0] ne correspond pas forcément à la requête 0

# Correct
results_by_id = {}
for line in output.strip().split("\n"):
    r = json.loads(line)
    results_by_id[r["custom_id"]] = r

Un modèle récent peut ne pas être disponible immédiatement en batch

La documentation Mistral indique que le Batch API est disponible pour “tous les modèles sur La Plateforme”, mais en pratique, les modèles très récents (quelques jours après leur sortie) peuvent retourner une erreur “model not available” sur l’endpoint batch. Le déploiement sur l’infrastructure batch est distinct du déploiement sur l’API synchrone.

En pratique, attendre quelques jours ou utiliser l’alias *-latest est la solution de contournement jusqu’à ce que le modèle soit pleinement disponible en batch. En cas de doute, contacter le support Mistral.

Un seul modèle par job

On ne peut pas mixer plusieurs modèles dans un même job. Pour comparer les sorties de deux modèles sur le même dataset, il faut créer deux jobs séparés avec le même fichier d’entrée.

SUCCESS ne signifie pas 100 % de réussite

Un job peut atteindre SUCCESS avec des requêtes individuelles en échec. Toujours vérifier failed_requests et récupérer le error_file si nécessaire.

Timeout et requêtes partiellement traitées

Si le job atteint TIMEOUT_EXCEEDED, les requêtes déjà traitées sont disponibles dans le fichier de sortie. On peut reconstruire les requêtes non traitées en comparant les custom_id du fichier d’entrée avec ceux du fichier de sortie et soumettre un nouveau job.


13. Gestion des jobs et des fichiers

# Lister les jobs en cours
jobs = client.batch.jobs.list(
    status="RUNNING",
    page=0,
    page_size=20
)

# Annuler un job
client.batch.jobs.cancel(job_id=job.id)

# Supprimer un fichier uploadé
client.files.delete(file_id=batch_file.id)

# Obtenir une URL signée (expire après 24h par défaut)
url_info = client.files.get_signed_url(file_id=job.output_file)
print(url_info.url)

La console offre également une interface visuelle pour gérer fichiers et jobs :

  • Fichiers : https://console.mistral.ai/build/files
  • Jobs batch : https://console.mistral.ai/build/batches

Conclusion

Le Batch API est un outil simple et efficace pour tout traitement LLM qui n’a pas de contrainte de latence. Le flux est minimal (upload → create → poll → download), le SDK Python est bien construit, et l’économie de 50 % est substantielle à l’échelle.

Les points à retenir :

  • Toujours corréler via custom_id, pas par position
  • Vérifier failed_requests même quand status == "SUCCESS"
  • Pour les très gros volumes, privilégier le mode fichier avec plusieurs fichiers si nécessaire
  • Les modèles fraîchement sortis peuvent avoir un délai avant d’être disponibles en batch

Documentation officielle · API Reference · Pricing