Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Building a Firehose
Search
Ian Barber
May 24, 2012
Technology
1.5k
5
Share
Building a Firehose
My talk about dealing with streaming data in PHP, from PHP Tek 12.
Ian Barber
May 24, 2012
More Decks by Ian Barber
See All by Ian Barber
Crossing Platforms With Google+ Sign-In
ianbarber
0
200
How Google Builds Webservices
ianbarber
3
370
Mobile & Social
ianbarber
2
190
Event Stream Processing In PHP
ianbarber
6
2.5k
Building A Firehose - PHPNW
ianbarber
2
1k
Clojure for PHP Developers
ianbarber
6
2k
Taking Sites Mobile
ianbarber
1
640
The Cookie Law
ianbarber
1
1k
Teaching Your Machine To Find Fraudsters
ianbarber
3
1.1k
Other Decks in Technology
See All in Technology
DMBOKを使ってレバレジーズのデータマネジメントを評価した
leveragestech
0
490
ThetaOS - A Mythical Machine comes Alive
aslander
0
230
TUNA Camp 2026 京都Stage ヒューリスティックアルゴリズム入門
terryu16
0
650
JAWS DAYS 2026でAIの「もやっと」感が解消された話
smt7174
1
120
Oracle Cloud Infrastructure(OCI):Onboarding Session(はじめてのOCI/Oracle Supportご利⽤ガイド)
oracle4engineer
PRO
2
17k
GitHub Actions侵害 — 相次ぐ事例を振り返り、次なる脅威に備える
flatt_security
11
6.9k
VSCode中心だった自分がターミナル沼に入門した話
sanogemaru
0
870
Zephyr(RTOS)でOpenPLCを実装してみた
iotengineer22
0
170
サイボウズ 開発本部採用ピッチ / Cybozu Engineer Recruit
cybozuinsideout
PRO
10
77k
AWS Systems Managerのハイブリッドアクティベーションを使用したガバメントクラウド環境の統合管理
toru_kubota
1
190
「できない」のアウトプット 同人誌『精神を壊してからの』シリーズ出版を 通して得られたこと
comi190327
3
430
AI時代のオンプレ-クラウドキャリアチェンジ考
yuu0w0yuu
0
680
Featured
See All Featured
How to Align SEO within the Product Triangle To Get Buy-In & Support - #RIMC
aleyda
1
1.5k
Game over? The fight for quality and originality in the time of robots
wayneb77
1
150
Mobile First: as difficult as doing things right
swwweet
225
10k
Making the Leap to Tech Lead
cromwellryan
135
9.8k
Ruling the World: When Life Gets Gamed
codingconduct
0
190
Max Prin - Stacking Signals: How International SEO Comes Together (And Falls Apart)
techseoconnect
PRO
0
140
Impact Scores and Hybrid Strategies: The future of link building
tamaranovitovic
0
240
The Web Performance Landscape in 2024 [PerfNow 2024]
tammyeverts
12
1.1k
Applied NLP in the Age of Generative AI
inesmontani
PRO
4
2.2k
Building a A Zero-Code AI SEO Workflow
portentint
PRO
0
420
Context Engineering - Making Every Token Count
addyosmani
9
780
We Analyzed 250 Million AI Search Results: Here's What I Found
joshbly
1
1.1k
Transcript
ian barber -
[email protected]
- @ianbarber https://github.com/ianbarber/Firehose-PHP-Talk BUILDING A FIREHOSE
FILTERABLE REAL TIME STREAMING DATA
SELLING DATA ANALYSIS & DECISIONS USER TOOLS $£¥ ☑☒
DATA SOURCES COMPOSE latency AUGMENT STORE FILTER STREAM
EVENT SAMPLE order tweet temperature snapshot
Data Source Data Source Data Source Output
Data Source Data Source Data Source Output Output
Data Source Data Source Data Source Output Messaging Batch HTTP
Logs HTTP Chunked Websockets Batched POST
APACHE PHP APACHE PHP NODE.JS PUSH ZEROMQ PULL HTTP POST
WEBSOCKETS
APACHE PHP APACHE PHP HTTP POST function sendPos() { navigator.geolocation.getCurrentPosition(
function(pos) { $.ajax({ type: 'POST', url:'http://firehose.com/input.php', data: {lat: pos.coords.latitude, lon: pos.coords.longitude}}); }); setTimeout(sendPos, 60000); } sendPos(); location.php
APACHE PHP APACHE PHP PUSH ZEROMQ WEBSOCKETS $ctx = new
ZMQContext(); $sock = $ctx->getSocket(ZMQ::SOCKET_PUSH); $sock->connect("tcp://localhost:5566"); $data = array( 'id' => get_next_msg_id(), 'uid' => $_COOKIE['uid'], 'lat' => $_POST['lat'], 'lon' => $_POST['lon'] ); $sock->send(json_encode($data)); input.php
APACHE PHP APACHE PHP NODE.JS ZEROMQ PULL WEBSOCKETS app=require('http').createServer(handler), io
= require('socket.io').listen(app), zmq = require('zmq'), sock = zmq.socket('pull'); app.listen(8080); sock.bind('tcp://*:5566'); sock.on('message', function (msg) { var data = JSON.parse(msg); // send to all clients io.sockets.emit("position", event); }); output.js
PHP DAEMON PHP DAEMON NODE.JS PUSH ZEROMQ PULL HTTP STREAM
WEBSOCKETS $fh = fopen("https://".$user.":". $pass."@stream.twitter.com/1/statuses/ filter.json?track=".$search, "r"); while(!feof($fh)) { $d = fgets($fh); if(strlen($d) > 4) { $sock->send($d); } } twitter.php
Data Source Data Source Output Assemble Process Process
SOURCE ASSEMBLE PHP PHP ZEROMQ PUB SUB SUB SUB REDIS
ZEROMQ PUSH
SOURCE PHP ZEROMQ PUB SUB REDIS $ctx = new ZMQContext();
$sub = $ctx->getSocket(ZMQ::SOCKET_SUB); $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,""); $sub->connect("tcp://localhost:5577"); while( $dat = $sub->recv() ) { $aug = augment(json_decode($dat,true),$obj); $redis->lpush($dat['id'],json_encode($aug)); } augmentor.php
$mongo = new Mongo(); $collection = $m->starbucks->locations; function augment($data, $collection)
{ $loc = array((float) $data['lon'], (float) $data['lat']); $res = $collection->findOne(array( 'loc' => array('$near' => $loc))); return array('name' => 'starbucks', 'val' => $res['street']); } SOURCE PHP REDIS starbucks.php DB
$ld = new Text_LanguageDetect(); $ld->setNameMode(2); function augment($data, $ld) { /*
["en"]=> float(0.24702222222222) */ $names = $ld->detect($data['text'], 1); return array('name' => 'lang', 'val' => key($names)); } SOURCE PHP REDIS langdetect.php
$zk = new Zookeeper(); $zk->connect("localhost:2181"); SOURCE ASSEMBLE PHP PHP REDIS
PHP ZOOKEEPER COUNT OF SERVICES
$zk->create( $path . "/" . uniqid(), NULL, array( array( "perms"
=> Zookeeper::PERM_ALL, "scheme" => "world", "id" => "anyone")), Zookeeper::EPHEMERAL); PHP ZOOKEEPER augmentor.php
REASSEMBLE SOURCE REDIS ZOOKEEPER define("TIMEOUT", 5); $ch = $zk->getChildren("/services"); $servs
= count($ch); COUNT
REASSEMBLE while($dat = $sub->recv()){ do { $start = microtime(true); $aug
= $redis->brpop($dat['id'],$time)); if(count($aug)) $dat['aug'][] = $aug; $time -= microtime(true) - $start; } while($time > 0 && count($dat['aug']) != $servs); $out->send(json_encode($dat)); //forward } COUNT reassemble.php
Data Source Data Source Output Assemble Process Process Filter Filter
Filter
FILTER ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG
? ? MSG ZEROMQ SUB ZEROMQ PUB TOPIC MSG TOPIC MSG TOPIC MSG ZEROMQ PULL QUERY - NAME MSG HTTP / REST
ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG MSG
HTTP / REST function escall($server, $path, $param) { $context = stream_context_create( array('http' => $http)); $result = file_get_contents( $serv.'/'.$path, NULL, $context); return json_decode( $result ); } elasticsearch.php
ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG MSG
HTTP / REST function percolate($host, $path, $tweet) { $path = "/twitter/tweet/_percolate"; $tweet = array('doc' => array( 'tweet' => $tweet['text'])); $match = escall($host, $path, array('content' => json_encode($tweet))); return $match['matches']; } elasticsearch.php
// snip... creating in, ctl, out ZMQ socks $poll =
new ZMQPoll(); $poll->add($in, ZMQ::POLL_IN); $poll->add($ctl, ZMQ::POLL_IN); $read = $write = array(); FILTER MSG ZEROMQ PULL QUERY - NAME ZEROMQ SUB elasticsearch.php
while(true) { $ev = $poll->poll($read, $write, -1); if($read[0] === $in)
{ $msg = json_decode( $in->recv() ); $matches = percolate($host, $msg); foreach($matches as $match) { $out->sendMulti(array($match, $msg)); } } else if($read[0] === $ctl) { $q = json_decode($ctl->recv()); $name = $q['name']; $query = $q['query']; add_query($host, $name,$query); } } elasticsearch.php
Data Source Output Queue Process Filter Data Store Data Store
STORE PHP KAFKA TOPIC TOPIC 1 2 3 4 1
2 3 4 SUB APACHE PHP CLIENT HTTP GET TOPIC - OFFSET
PHP KAFKA TOPIC TOPIC 1 2 3 4 1 2
3 4 SUB $k = new Kafka_Producer("localhost", 9092); while ($data = $in->recvMulti()) { $topic = $data[0]; $msg = $data[1]; $bytes = $k->send(array($msg), $topic); } kafkastore.php
$consumer = new Kafka_SimpleConsumer( 'localhost', 9092, 1, $max); do {
$msgs = $consumer->fetch( new Kafka_FetchRequest($top,0,$os,$max) ); foreach($msgs as $msg) echo $msg->payload(), "\n"; $offset += $msgs->validBytes(); } while($msgs->validBytes() > 0); echo json_encode(array("offset"=>$offset)); kafkaconsume.php KAFKA TOPIC TOPIC 1 2 3 4 1 2 3 4 APACHE PHP CLIENT GET
OPS
JSON & MSGPACK $data = array('id'=>1,'a'=>'a','b'=>'xyz', 'c' => array(1, 2,
"abcdefg", array(5, 7, 8))); $enc = json_encode($data); var_dump( json_decode($enc) ); $enc = msgpack_pack($data); var_dump( msgpack_unpack($enc) ); JSON MSGPACK MSGPACK JSON
Data Source Output Queue Process Filter Data Store Tap Trace
Trace Trace
Data Source Output See Also: http://slidesha.re/JaWE78
ian barber -
[email protected]
- @ianbarber https://github.com/ianbarber/Firehose-PHP-Talk THANKS!