P
imvdmolen.nl
Blog

Hoe ik API response transformatie implementeer met PHP's stream handling voor grote datasets

Bij integratieprojecten waarbij dagelijks grote hoeveelheden data van externe API's moeten worden verwerkt loopt het snel mis met geheugen. Een export van twee miljoen productrecords ophalen via de standaard file_get_contents() aanpak resulteert vrijwel direct in een memory fatal error. Zelfs met ini_set('memory_limit', '2G') blijft het systeem vastlopen. Stream handling is in zulke gevallen de enige werkende aanpak.

Streams in PHP stellen je in staat om data regel voor regel of blok voor blok te verwerken, in plaats van alles in één keer in het geheugen te laden. Bij API responses betekent dit dat je de data kunt transformeren terwijl deze binnenkomt, zonder ooit de volledige response in het geheugen te hoeven opslaan. Deze aanpak heeft mij al meerdere keren gered bij projecten met grote datasets.

Stream context voor API authenticatie

Een stream context in PHP werkt prima voor API calls die authenticatie vereisen. Ik configureer altijd eerst mijn HTTP headers en authenticatie voordat ik de stream open. Het voordeel van deze aanpak is dat je volledige controle hebt over de HTTP request parameters zonder externe dependencies zoals cURL.

<?php

class ApiStreamProcessor 
{
    private $apiKey;
    private $baseUrl;
    
    public function __construct($apiKey, $baseUrl) 
    {
        $this->apiKey = $apiKey;
        $this->baseUrl = $baseUrl;
    }
    
    private function createStreamContext() 
    {
        return stream_context_create([
            'http' => [
                'method' => 'GET',
                'header' => [
                    'Authorization: Bearer ' . $this->apiKey,
                    'Accept: application/json',
                    'Content-Type: application/json'
                ],
                'timeout' => 300
            ]
        ]);
    }
    
    public function processLargeDataset($endpoint) 
    {
        $url = $this->baseUrl . $endpoint;
        $context = $this->createStreamContext();
        
        $stream = fopen($url, 'r', false, $context);
        
        if (!$stream) {
            throw new Exception('Unable to open stream');
        }
        
        return $this->transformStream($stream);
    }
}

De timeout van 300 seconden zet ik bewust hoog omdat grote API responses tijd nodig hebben om volledig binnen te komen. Stream contexts geven je ook toegang tot HTTP response headers via stream_get_meta_data(), wat handig is voor het controleren van status codes en response headers voordat je begint met verwerken.

Regel-voor-regel transformatie van JSON streams

JSON streams zijn een uitdaging omdat je niet zomaar elke regel als valide JSON kunt parsen. API's leveren vaak JSON in verschillende formaten: soms één groot object, soms JSON Lines format, soms een array met objecten. Ik heb een strategie ontwikkeld die werkt voor de meeste scenario's door eerst het formaat te detecteren.

private function transformStream($stream) 
{
    $buffer = '';
    $results = [];
    $inObject = false;
    $objectDepth = 0;
    
    while (!feof($stream)) {
        $chunk = fread($stream, 8192);
        $buffer .= $chunk;
        
        // Process complete JSON objects from buffer
        while (($pos = $this->findCompleteJsonObject($buffer)) !== false) {
            $jsonString = substr($buffer, 0, $pos);
            $buffer = substr($buffer, $pos);
            
            $decoded = json_decode($jsonString, true);
            
            if ($decoded !== null) {
                $transformed = $this->transformObject($decoded);
                if ($transformed) {
                    $results[] = $transformed;
                }
                
                // Yield results in batches to avoid memory buildup
                if (count($results) >= 100) {
                    yield $results;
                    $results = [];
                }
            }
        }
    }
    
    // Handle remaining buffer
    if (!empty($buffer)) {
        $decoded = json_decode($buffer, true);
        if ($decoded !== null) {
            $transformed = $this->transformObject($decoded);
            if ($transformed) {
                $results[] = $transformed;
            }
        }
    }
    
    if (!empty($results)) {
        yield $results;
    }
    
    fclose($stream);
}

Het gebruik van generators met yield is cruciaal hier. Hierdoor kan de calling code de getransformeerde data direct verwerken zonder dat alles in het geheugen opgebouwd wordt. Bij een dataset van 2 miljoen records scheelt dit enorm in memory usage.

Deze buffer-gebaseerde aanpak zorgt ervoor dat ik ook met incomplete JSON chunks kan werken. De findCompleteJsonObject() methode detecteert wanneer een volledig JSON object beschikbaar is in de buffer door accolades te tellen en escape characters te respecteren.

Foutafhandeling tijdens streaming

Streaming introduceert nieuwe uitdagingen voor error handling. Netwerk timeouts, incomplete data, of API rate limits kunnen midden in een stream optreden. Daarom implementeer ik altijd retry logic met exponential backoff voor streams. Dit voorkomt dat een groot download proces volledig mislukt door een tijdelijke netwerkfout.

private function streamWithRetry($url, $context, $maxRetries = 3) 
{
    $attempt = 0;
    
    while ($attempt <= $maxRetries) {
        try {
            $stream = fopen($url, 'r', false, $context);
            
            if (!$stream) {
                throw new Exception('Failed to open stream');
            }
            
            // Check HTTP response code
            $meta = stream_get_meta_data($stream);
            $httpCode = $this->extractHttpCode($meta);
            
            if ($httpCode >= 400) {
                fclose($stream);
                throw new Exception("HTTP Error: {$httpCode}");
            }
            
            return $stream;
            
        } catch (Exception $e) {
            $attempt++;
            
            if ($attempt > $maxRetries) {
                throw new Exception("Max retries exceeded: " . $e->getMessage());
            }
            
            // Exponential backoff
            $delay = pow(2, $attempt - 1);
            sleep($delay);
            
            error_log("Stream attempt {$attempt} failed, retrying in {$delay}s: " . $e->getMessage());
        }
    }
}

private function extractHttpCode($meta) 
{
    if (isset($meta['wrapper_data'])) {
        foreach ($meta['wrapper_data'] as $header) {
            if (preg_match('/HTTP\/\d\.\d\s+(\d+)/', $header, $matches)) {
                return intval($matches[1]);
            }
        }
    }
    return 200; // Assume success if no HTTP code found
}

Rate limiting afhandeling is ook essentieel bij streaming. Veel API's sturen een Retry-After header wanneer je de rate limit bereikt. Ik parse deze header en pas daar mijn retry interval op aan in plaats van de standaard exponential backoff te gebruiken.

Performance monitoring en geheugengebruik

Tijdens het ontwikkelen van streaming solutions monitor ik altijd het geheugengebruik om zeker te weten dat mijn implementatie daadwerkelijk memory-efficiënt is. PHP's memory_get_usage() en memory_get_peak_usage() zijn onmisbare tools hiervoor. Ook log ik de processing snelheid om bottlenecks te identificeren.

public function processWithMonitoring($endpoint) 
{
    $startTime = microtime(true);
    $startMemory = memory_get_usage();
    $recordCount = 0;
    
    foreach ($this->processLargeDataset($endpoint) as $batch) {
        $batchSize = count($batch);
        $recordCount += $batchSize;
        
        // Process the batch (save to database, send to queue, etc.)
        $this->processBatch($batch);
        
        // Memory monitoring
        $currentMemory = memory_get_usage();
        $peakMemory = memory_get_peak_usage();
        
        $elapsed = microtime(true) - $startTime;
        $recordsPerSecond = $recordCount / $elapsed;
        
        error_log(sprintf(
            "Processed %d records (%.2f/sec) | Current: %.2fMB | Peak: %.2fMB",
            $recordCount,
            $recordsPerSecond,
            $currentMemory / 1024 / 1024,
            $peakMemory / 1024 / 1024
        ));
        
        // Optional: Add circuit breaker for memory usage
        if ($currentMemory > 512 * 1024 * 1024) { // 512MB limit
            throw new Exception("Memory usage exceeded safe limits");
        }
    }
    
    $totalTime = microtime(true) - $startTime;
    error_log("Stream processing completed: {$recordCount} records in {$totalTime}s");
}

Circuit breakers voor geheugengebruik zijn een veiligheidsmaatregel die ik altijd inbouw. Zelfs bij goed geïmplementeerd streaming kunnen memory leaks optreden, bijvoorbeeld door het accumuleren van error logs of door third-party libraries die intern data cachen. Een memory limit voorkomt dat dit je server kan crashen.

Het gebruik van stream handling voor API response transformatie heeft mijn workflow enorm verbeterd. Projecten die voorheen onmogelijk waren door memory constraints zijn nu routineklusjes geworden. Stream processing opent ook de deur naar real-time data transformatie, waarbij je data kunt verwerken en doorsturen terwijl deze nog binnenkomt. Dit principe gebruik ik ook bij ETL processen waar ik data van de ene API naar de andere doorstuur zonder tussenopslag.