Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Building A Firehose - PHPNW
Search
Sponsored
·
Ship Features Fearlessly
Turn features on and off without deploys. Used by thousands of Ruby developers.
→
Ian Barber
October 06, 2012
Technology
1k
2
Share
Building A Firehose - PHPNW
#phpnw12 version of my talk on building firehose style streaming data systems
Ian Barber
October 06, 2012
More Decks by Ian Barber
See All by Ian Barber
Crossing Platforms With Google+ Sign-In
ianbarber
0
200
How Google Builds Webservices
ianbarber
3
370
Mobile & Social
ianbarber
2
190
Event Stream Processing In PHP
ianbarber
6
2.5k
Clojure for PHP Developers
ianbarber
6
2k
Building a Firehose
ianbarber
5
1.5k
Taking Sites Mobile
ianbarber
1
640
The Cookie Law
ianbarber
1
1k
Teaching Your Machine To Find Fraudsters
ianbarber
3
1.1k
Other Decks in Technology
See All in Technology
AgentCoreとLINEを使った飲食店おすすめアプリを作ってみた
yakumo
2
270
不確実性と戦いながら見積もりを作成するプロセス/mitsumori-process
hirodragon112
1
160
Kubernetesの「隠れメモリ消費」によるNode共倒れと、Request適正化という処方箋
g0xu
0
170
Oracle Cloud Infrastructure:2026年3月度サービス・アップデート
oracle4engineer
PRO
0
230
Zephyr(RTOS)でOpenPLCを実装してみた
iotengineer22
0
170
【Oracle Cloud ウェビナー】データ主権はクラウドで守れるのか?NTTデータ様のOracle Alloyで実現するソブリン対応クラウドの最適解
oracle4engineer
PRO
3
130
MCPで決済に楽にする
mu7889yoon
0
160
タスク管理も1on1も、もう「管理」じゃない - KiroとBedrock AgentCoreで変わった“判断の仕事”
yusukeshimizu
0
150
サイボウズ 開発本部採用ピッチ / Cybozu Engineer Recruit
cybozuinsideout
PRO
10
77k
ブラックボックス化したMLシステムのVertex AI移行 / mlops_community_62
visional_engineering_and_design
1
250
GitHub Advanced Security × Defender for Cloudで開発とSecOpsのサイロを超える: コードとクラウドをつなぐ、開発プラットフォームのセキュリティ
yuriemori
1
120
Oracle Cloud Infrastructure(OCI):Onboarding Session(はじめてのOCI/Oracle Supportご利⽤ガイド)
oracle4engineer
PRO
2
17k
Featured
See All Featured
From Legacy to Launchpad: Building Startup-Ready Communities
dugsong
0
190
The B2B funnel & how to create a winning content strategy
katarinadahlin
PRO
1
320
Designing Experiences People Love
moore
143
24k
What’s in a name? Adding method to the madness
productmarketing
PRO
24
4k
Evolution of real-time – Irina Nazarova, EuRuKo, 2024
irinanazarova
9
1.2k
The Anti-SEO Checklist Checklist. Pubcon Cyber Week
ryanjones
0
110
Kristin Tynski - Automating Marketing Tasks With AI
techseoconnect
PRO
0
210
Why You Should Never Use an ORM
jnunemaker
PRO
61
9.8k
How to build a perfect <img>
jonoalderson
1
5.3k
30 Presentation Tips
portentint
PRO
1
270
How to build an LLM SEO readiness audit: a practical framework
nmsamuel
1
700
Design and Strategy: How to Deal with People Who Don’t "Get" Design
morganepeng
133
19k
Transcript
+Ian Barber -
[email protected]
- @ianbarber https://github.com/ianbarber/Firehose-PHP-Talk BUILDING A FIREHOSE
FILTERABLE REAL TIME STREAMING DATA
SELLING DATA ANALYSIS & DECISIONS USER TOOLS $£¥ ☑☒
DATA SOURCES COMPOSE latency AUGMENT STORE FILTER STREAM
EVENT SAMPLE order tweet temperature snapshot
Data Source Data Source Data Source Output
Data Source Data Source Data Source Output Output
Data Source Data Source Data Source Output Messaging Batch HTTP
Logs HTTP Chunked Websockets Batched POST
APACHE PHP APACHE PHP NODE.JS PUSH ZEROMQ PULL HTTP POST
WEBSOCKETS
APACHE PHP APACHE PHP HTTP POST function sendPos() { navigator.geolocation.getCurrentPosition(
function(pos) { $.ajax({ type: 'POST', url:'http://firehose.com/input.php', data: {lat: pos.coords.latitude, lon: pos.coords.longitude}}); }); setTimeout(sendPos, 60000); } sendPos(); location.php
APACHE PHP APACHE PHP PUSH ZEROMQ WEBSOCKETS $ctx = new
ZMQContext(); $sock = $ctx->getSocket(ZMQ::SOCKET_PUSH); $sock->connect("tcp://localhost:5566"); $data = array( 'id' => get_next_msg_id(), 'uid' => $_COOKIE['uid'], 'lat' => $_POST['lat'], 'lon' => $_POST['lon'] ); $sock->send(json_encode($data)); input.php
APACHE PHP APACHE PHP NODE.JS ZEROMQ PULL WEBSOCKETS app=require('http').createServer(handler), io
= require('socket.io').listen(app), zmq = require('zmq'), sock = zmq.socket('pull'); app.listen(8080); sock.bind('tcp://*:5566'); sock.on('message', function (msg) { var data = JSON.parse(msg); // send to all clients io.sockets.emit("position", event); }); output.js
PHP DAEMON PHP DAEMON NODE.JS PUSH ZEROMQ PULL HTTP STREAM
WEBSOCKETS $fh = fopen("https://".$user.":". $pass."@stream.twitter.com/1/statuses/ filter.json?track=".$search, "r"); while(!feof($fh)) { $d = fgets($fh); if(strlen($d) > 4) { $sock->send($d); } } twitter.php
Data Source Data Source Output Assemble Process Process
SOURCE ASSEMBLE PHP PHP ZEROMQ PUB SUB SUB SUB REDIS
ZEROMQ PUSH
SOURCE PHP ZEROMQ PUB SUB REDIS $ctx = new ZMQContext();
$sub = $ctx->getSocket(ZMQ::SOCKET_SUB); $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,""); $sub->connect("tcp://localhost:5577"); while( $dat = $sub->recv() ) { $aug = augment(json_decode($dat,true),$obj); $redis->lpush($dat['id'],json_encode($aug)); } augmentor.php
$mongo = new Mongo(); $collection = $m->starbucks->locations; function augment($data, $collection)
{ $loc = array((float) $data['lon'], (float) $data['lat']); $res = $collection->findOne(array( 'loc' => array('$near' => $loc))); return array('name' => 'starbucks', 'val' => $res['street']); } SOURCE PHP REDIS starbucks.php DB
$ld = new Text_LanguageDetect(); $ld->setNameMode(2); function augment($data, $ld) { /*
["en"]=> float(0.24702222222222) */ $names = $ld->detect($data['text'], 1); return array('name' => 'lang', 'val' => key($names)); } SOURCE PHP REDIS langdetect.php
$zk = new Zookeeper(); $zk->connect("localhost:2181"); SOURCE ASSEMBLE PHP PHP REDIS
PHP ZOOKEEPER COUNT OF SERVICES
$zk->create( $path . "/" . uniqid(), NULL, array( array( "perms"
=> Zookeeper::PERM_ALL, "scheme" => "world", "id" => "anyone")), Zookeeper::EPHEMERAL); PHP ZOOKEEPER augmentor.php
REASSEMBLE SOURCE REDIS ZOOKEEPER define("TIMEOUT", 5); $ch = $zk->getChildren("/services"); $servs
= count($ch); COUNT
REASSEMBLE while($dat = $sub->recv()){ do { $start = microtime(true); $aug
= $redis->brpop($dat['id'],$time)); if(count($aug)) $dat['aug'][] = $aug; $time -= microtime(true) - $start; } while($time > 0 && count($dat['aug']) != $servs); $out->send(json_encode($dat)); //forward } COUNT reassemble.php
Data Source Data Source Output Assemble Process Process Filter Filter
Filter
FILTER ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG
? ? MSG ZEROMQ SUB ZEROMQ PUB TOPIC MSG TOPIC MSG TOPIC MSG ZEROMQ PULL QUERY - NAME MSG HTTP / REST
ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG MSG
HTTP / REST function escall($server, $path, $param) { $context = stream_context_create( array('http' => $http)); $result = file_get_contents( $serv.'/'.$path, NULL, $context); return json_decode( $result ); } elasticsearch.php
ELASTIC SEARCH QUERY - NAME QUERY - NAME MSG MSG
HTTP / REST function percolate($host, $path, $tweet) { $path = "/twitter/tweet/_percolate"; $tweet = array('doc' => array( 'tweet' => $tweet['text'])); $match = escall($host, $path, array('content' => json_encode($tweet))); return $match['matches']; } elasticsearch.php
// snip... creating in, ctl, out ZMQ socks $poll =
new ZMQPoll(); $poll->add($in, ZMQ::POLL_IN); $poll->add($ctl, ZMQ::POLL_IN); $read = $write = array(); FILTER MSG ZEROMQ PULL QUERY - NAME ZEROMQ SUB elasticsearch.php
while(true) { $ev = $poll->poll($read, $write, -1); if($read[0] === $in)
{ $msg = json_decode( $in->recv() ); $matches = percolate($host, $msg); foreach($matches as $match) { $out->sendMulti(array($match, $msg)); } } else if($read[0] === $ctl) { $q = json_decode($ctl->recv()); $name = $q['name']; $query = $q['query']; add_query($host, $name,$query); } } elasticsearch.php
Data Source Output Queue Process Filter Data Store Data Store
STORE PHP KAFKA TOPIC TOPIC 1 2 3 4 1
2 3 4 SUB APACHE PHP CLIENT HTTP GET TOPIC - OFFSET
PHP KAFKA TOPIC TOPIC 1 2 3 4 1 2
3 4 SUB $k = new Kafka_Producer("localhost", 9092); while ($data = $in->recvMulti()) { $topic = $data[0]; $msg = $data[1]; $bytes = $k->send(array($msg), $topic); } kafkastore.php
$consumer = new Kafka_SimpleConsumer( 'localhost', 9092, 1, $max); do {
$msgs = $consumer->fetch( new Kafka_FetchRequest($top,0,$os,$max) ); foreach($msgs as $msg) echo $msg->payload(), "\n"; $offset += $msgs->validBytes(); } while($msgs->validBytes() > 0); echo json_encode(array("offset"=>$offset)); kafkaconsume.php KAFKA TOPIC TOPIC 1 2 3 4 1 2 3 4 APACHE PHP CLIENT GET
OPS
JSON & MSGPACK $data = array('id'=>1,'a'=>'a','b'=>'xyz', 'c' => array(1, 2,
"abcdefg", array(5, 7, 8))); $enc = json_encode($data); var_dump( json_decode($enc) ); $enc = msgpack_pack($data); var_dump( msgpack_unpack($enc) ); JSON MSGPACK MSGPACK JSON
Data Source Output Queue Process Filter Data Store Tap Trace
Trace Trace
Data Source Output See Also: http://slidesha.re/JaWE78
+Ian Barber -
[email protected]
- @ianbarber https://github.com/ianbarber/Firehose-PHP-Talk THANKS!